Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6BD7B200D0C for ; Wed, 20 Sep 2017 18:42:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 69FB61609D8; Wed, 20 Sep 2017 16:42:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D1FF41609E4 for ; Wed, 20 Sep 2017 18:42:50 +0200 (CEST) Received: (qmail 68235 invoked by uid 500); 20 Sep 2017 16:42:50 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 68126 invoked by uid 99); 20 Sep 2017 16:42:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Sep 2017 16:42:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 92768F5818; Wed, 20 Sep 2017 16:42:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lburgazzoli@apache.org To: commits@camel.apache.org Date: Wed, 20 Sep 2017 16:42:53 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] camel git commit: CAMEL-10026: HealthCheck API archived-at: Wed, 20 Sep 2017 16:42:53 -0000 http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java new file mode 100644 index 0000000..8fbbd0c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/DefaultHealthCheckService.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.StampedLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import org.apache.camel.CamelContext; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.health.HealthCheckService; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.concurrent.LockHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DefaultHealthCheckService extends ServiceSupport implements HealthCheckService { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultHealthCheckService.class); + + private final ConcurrentMap checks; + private final ConcurrentMap> options; + private final List> listeners; + private final StampedLock lock; + + private CamelContext camelContext; + private ScheduledExecutorService executorService; + private long checkInterval; + private TimeUnit checkIntervalUnit; + private volatile HealthCheckRegistry registry; + private volatile ScheduledFuture future; + + public DefaultHealthCheckService() { + this(null); + } + + public DefaultHealthCheckService(CamelContext camelContext) { + this.checks = new ConcurrentHashMap<>(); + this.options = new ConcurrentHashMap<>(); + this.listeners = new ArrayList<>(); + this.lock = new StampedLock(); + + this.camelContext = camelContext; + this.checkInterval = 30; + this.checkIntervalUnit = TimeUnit.SECONDS; + } + + // ************************************ + // Properties + // ************************************ + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + public HealthCheckRegistry getHealthCheckRegistry() { + return registry; + } + + public void setHealthCheckRegistry(HealthCheckRegistry registry) { + this.registry = registry; + } + + public long getCheckInterval() { + return checkInterval; + } + + public void setCheckInterval(long checkInterval) { + this.checkInterval = checkInterval; + } + + public void setCheckInterval(long interval, TimeUnit intervalUnit) { + setCheckInterval(interval); + setCheckIntervalUnit(intervalUnit); + } + + public TimeUnit getCheckIntervalUnit() { + return checkIntervalUnit; + } + + public void setCheckIntervalUnit(TimeUnit checkIntervalUnit) { + this.checkIntervalUnit = checkIntervalUnit; + } + + @Override + public void addStateChangeListener(BiConsumer consumer) { + LockHelper.doWithWriteLock( + lock, + () -> listeners.add(consumer) + ); + } + + @Override + public void removeStateChangeListener(BiConsumer consumer) { + LockHelper.doWithWriteLock( + lock, + () -> listeners.removeIf(listener -> listener.equals(consumer)) + ); + } + + @Override + public void setHealthCheckOptions(String id, Map options) { + options.put(id, options); + } + + @Override + public Optional call(String id) { + return call(id, options.getOrDefault(id, Collections.emptyMap())); + } + + @Override + public Optional call(String id, Map options) { + return registry.getCheck(id).map(check -> invoke(check, options)); + } + + @Override + public void notify(HealthCheck check, HealthCheck.Result result) { + LockHelper.doWithWriteLock( + lock, + () -> processResult(check, result) + ); + } + + @Override + public Collection getResults() { + return new ArrayList<>(this.checks.values()); + } + + // ************************************ + // Lifecycle + // ************************************ + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(camelContext, "CamelContext"); + + if (executorService == null) { + executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "DefaultHealthCheckService"); + } + if (future != null) { + future.cancel(true); + } + if (registry == null) { + registry = camelContext.getHealthCheckRegistry(); + } + + if (ObjectHelper.isNotEmpty(registry) && ObjectHelper.isEmpty(future)) { + // Start the health check task only if the health check registry + // has been registered. + LOGGER.debug("Schedule health-checks to be executed every %d (%s)", checkInterval, checkIntervalUnit.name()); + future = executorService.scheduleAtFixedRate( + () -> { + if (!isRunAllowed()) { + // do not invoke the check if the service is not yet + // properly started. + return; + } + + LOGGER.debug("Invoke health-checks (scheduled)"); + + registry.stream() + .collect(Collectors.groupingBy(HealthCheckHelper::getGroup)) + .entrySet().stream() + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .sorted(Comparator.comparingInt(HealthCheck::getOrder)) + .forEach(this::invoke); + }, + checkInterval, + checkInterval, + checkIntervalUnit); + } + } + + @Override + protected void doStop() throws Exception { + if (future != null) { + future.cancel(true); + future = null; + } + if (executorService != null) { + if (camelContext != null) { + camelContext.getExecutorServiceManager().shutdownNow(executorService); + } else { + executorService.shutdownNow(); + } + executorService = null; + } + } + + // ************************************ + // Helpers + // ************************************ + + private HealthCheck.Result processResult(HealthCheck check, HealthCheck.Result result) { + final HealthCheck.Result cachedResult = checks.get(check); + + if (!isSameResult(result, cachedResult)) { + // Maybe make the listener aware of the reason, i.e. + // the service is still un-healthy but the message + // or error has changed. + listeners.forEach(listener -> listener.accept(result.getState(), check)); + } + + // replace the old result with the new one even if the + // state has not changed but the reason/error may be + // changed. + checks.put(check, result); + + return result; + } + + private HealthCheck.Result invoke(HealthCheck check) { + return invoke(check, options.getOrDefault(check.getId(), Collections.emptyMap())); + } + + private HealthCheck.Result invoke(HealthCheck check, Map options) { + return LockHelper.suppliWithWriteLock( + lock, + () -> { + LOGGER.debug("Invoke health-check {}", check.getId()); + return processResult(check, check.call(options)); + } + ); + } + + /** + * Check if two results are equals by checking only the state, this method + * does not check if the result comes from the same health check, this should + * be done by the caller. + *

+ * A future implementation should check all the parameter of the result. + */ + private boolean isSameResult(HealthCheck.Result r1, HealthCheck.Result r2) { + if (Objects.equals(r1, r2)) { + return true; + } + + if (r1 != null && r2 != null) { + return r1.getState() == r2.getState(); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/PerformanceCounterEvaluator.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/PerformanceCounterEvaluator.java b/camel-core/src/main/java/org/apache/camel/impl/health/PerformanceCounterEvaluator.java new file mode 100644 index 0000000..5bf780f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/PerformanceCounterEvaluator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.Map; + +import org.apache.camel.api.management.mbean.ManagedPerformanceCounterMBean; +import org.apache.camel.health.HealthCheckResultBuilder; + +@FunctionalInterface +public interface PerformanceCounterEvaluator { + /** + * Check the given performance counter. + */ + void test(T counter, HealthCheckResultBuilder builder, Map options); +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/RegistryRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/RegistryRepository.java b/camel-core/src/main/java/org/apache/camel/impl/health/RegistryRepository.java new file mode 100644 index 0000000..52fc3e4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/RegistryRepository.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.stream.Stream; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckRepository; + +public class RegistryRepository implements CamelContextAware, HealthCheckRepository { + private volatile CamelContext context; + + @Override + public void setCamelContext(CamelContext camelContext) { + this.context = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return context; + } + + @Override + public Stream stream() { + return this.context != null + ? this.context.getRegistry().findByType(HealthCheck.class).stream() + : Stream.empty(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/RouteHealthCheck.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/RouteHealthCheck.java b/camel-core/src/main/java/org/apache/camel/impl/health/RouteHealthCheck.java new file mode 100644 index 0000000..8d7a198 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/RouteHealthCheck.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Route; +import org.apache.camel.ServiceStatus; +import org.apache.camel.api.management.mbean.ManagedRouteMBean; +import org.apache.camel.health.HealthCheckResultBuilder; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RouteHealthCheck extends AbstractHealthCheck { + private static final Logger LOGGER = LoggerFactory.getLogger(RouteHealthCheck.class); + + private final Route route; + private final List> evaluators; + + public RouteHealthCheck(Route route) { + this(route, null); + } + + public RouteHealthCheck(Route route, Collection> evaluators) { + super("camel", "route:" + route.getId()); + + this.route = route; + + if (ObjectHelper.isNotEmpty(evaluators)) { + this.evaluators = new ArrayList<>(evaluators); + } else { + this.evaluators = Collections.emptyList(); + } + } + + @Override + protected void doCall(HealthCheckResultBuilder builder, Map options) { + if (route.getId() != null) { + final CamelContext context = route.getRouteContext().getCamelContext(); + final ServiceStatus status = context.getRouteStatus(route.getId()); + + builder.detail("route.id", route.getId()); + builder.detail("route.status", status.name()); + builder.detail("route.context.name", context.getName()); + + if (route.getRouteContext().getRouteController() != null || route.getRouteContext().isAutoStartup()) { + if (status.isStarted()) { + builder.up(); + } else if (status.isStopped()) { + builder.down(); + builder.message(String.format("Route %s has status %s", route.getId(), status.name())); + } + } else { + LOGGER.debug("Route {} marked as UP (controlled={}, auto-startup={})", + route.getId(), + route.getRouteContext().getRouteController() != null, + route.getRouteContext().isAutoStartup() + ); + + // Assuming that if no route controller is configured or if a + // route is configured to not to automatically start, then the + // route is always up as it is externally managed. + builder.up(); + } + + if (builder.state() != State.DOWN) { + // If JMX is enabled, use the Managed MBeans to determine route + // health based on performance counters. + ManagedRouteMBean managedRoute = context.getManagedRoute(route.getId(), ManagedRouteMBean.class); + + if (managedRoute != null && !evaluators.isEmpty()) { + Map details = new HashMap<>(); + + for (PerformanceCounterEvaluator evaluator : evaluators) { + details.clear(); + + evaluator.test(managedRoute, builder, options); + + if (builder.state() == State.DOWN) { + break; + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/RoutePerformanceCounterEvaluators.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/RoutePerformanceCounterEvaluators.java b/camel-core/src/main/java/org/apache/camel/impl/health/RoutePerformanceCounterEvaluators.java new file mode 100644 index 0000000..f50adf5 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/RoutePerformanceCounterEvaluators.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.Map; + +import org.apache.camel.api.management.mbean.ManagedRouteMBean; +import org.apache.camel.health.HealthCheckResultBuilder; + +public final class RoutePerformanceCounterEvaluators { + + private RoutePerformanceCounterEvaluators() { + } + + // ******************************** + // Helpers + // ******************************** + + public static PerformanceCounterEvaluator exchangesFailed(long threshold) { + return new ExchangesFailed(threshold); + } + + public static PerformanceCounterEvaluator exchangesInflight(long threshold) { + return new ExchangesInflight(threshold); + } + + public static PerformanceCounterEvaluator redeliveries(long threshold) { + return new Redeliveries(threshold); + } + + public static PerformanceCounterEvaluator externalRedeliveries(long threshold) { + return new ExternalRedeliveries(threshold); + } + + public static PerformanceCounterEvaluator lastProcessingTime(long timeThreshold, int failuresThreshold) { + return new LastProcessingTime(timeThreshold, failuresThreshold); + } + + public static PerformanceCounterEvaluator minProcessingTime(long timeThreshold, int failuresThreshold) { + return new MinProcessingTime(timeThreshold, failuresThreshold); + } + + public static PerformanceCounterEvaluator meanProcessingTime(long timeThreshold, int failuresThreshold) { + return new MeanProcessingTime(timeThreshold, failuresThreshold); + } + + public static PerformanceCounterEvaluator maxProcessingTime(long timeThreshold, int failuresThreshold) { + return new MaxProcessingTime(timeThreshold, failuresThreshold); + } + + // ******************************** + // Impls + // ******************************** + + public static final class ExchangesFailed implements PerformanceCounterEvaluator { + private final long threshold; + + public ExchangesFailed(long threshold) { + this.threshold = threshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getExchangesFailed(); + if (value > threshold) { + builder.down(); + } + + builder.detail("exchanges.failed", value); + builder.detail("exchanges.failed.threshold", threshold); + } catch (Exception e) { + } + } + } + + public static final class ExchangesInflight implements PerformanceCounterEvaluator { + private final long threshold; + + public ExchangesInflight(long threshold) { + this.threshold = threshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getExchangesInflight(); + if (value > threshold) { + builder.down(); + } + + builder.detail("exchanges.inflight", value); + builder.detail("exchanges.inflight.threshold", threshold); + } catch (Exception e) { + } + } + } + + public static final class Redeliveries implements PerformanceCounterEvaluator { + private final long threshold; + + public Redeliveries(long threshold) { + this.threshold = threshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getRedeliveries(); + if (value > threshold) { + builder.down(); + } + + builder.detail("exchanges.redeliveries", value); + builder.detail("exchanges.redeliveries.threshold", threshold); + } catch (Exception e) { + } + } + } + + public static final class ExternalRedeliveries implements PerformanceCounterEvaluator { + private final long threshold; + + public ExternalRedeliveries(long threshold) { + this.threshold = threshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getExternalRedeliveries(); + if (value > threshold) { + builder.down(); + } + + builder.detail("exchanges.external-redeliveries", value); + builder.detail("exchanges.external-redeliveries.threshold", threshold); + } catch (Exception e) { + } + } + } + + public static final class LastProcessingTime implements PerformanceCounterEvaluator { + private final long timeThreshold; + private final int failuresThreshold; + + private volatile int failureCount; + + public LastProcessingTime(long timeThreshold, int failuresThreshold) { + this.timeThreshold = timeThreshold; + this.failuresThreshold = failuresThreshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getLastProcessingTime(); + if (value > timeThreshold) { + failureCount++; + + if (failureCount > failuresThreshold) { + builder.down(); + } + } else { + failureCount = 0; + } + + builder.detail("exchanges.last-processing-time", value); + builder.detail("exchanges.last-processing-time.threshold.time", timeThreshold); + builder.detail("exchanges.last-processing-time.threshold.failures", failuresThreshold); + } catch (Exception e) { + } + } + } + + public static final class MinProcessingTime implements PerformanceCounterEvaluator { + private final long timeThreshold; + private final int failuresThreshold; + + private volatile int failureCount; + + public MinProcessingTime(long timeThreshold, int failuresThreshold) { + this.timeThreshold = timeThreshold; + this.failuresThreshold = failuresThreshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getMinProcessingTime(); + if (value > timeThreshold) { + failureCount++; + + if (failureCount > failuresThreshold) { + builder.down(); + } + } else { + failureCount = 0; + } + + builder.detail("exchanges.min-processing-time", value); + builder.detail("exchanges.min-processing-time.threshold.time", timeThreshold); + builder.detail("exchanges.min-processing-time.threshold.failures", failuresThreshold); + } catch (Exception e) { + } + } + } + + public static final class MeanProcessingTime implements PerformanceCounterEvaluator { + private final long timeThreshold; + private final int failuresThreshold; + + private volatile int failureCount; + + public MeanProcessingTime(long timeThreshold, int failuresThreshold) { + this.timeThreshold = timeThreshold; + this.failuresThreshold = failuresThreshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getMeanProcessingTime(); + if (value > timeThreshold) { + failureCount++; + + if (failureCount > failuresThreshold) { + builder.down(); + } + } else { + failureCount = 0; + } + + builder.detail("exchanges.mean-processing-time", value); + builder.detail("exchanges.mean-processing-time.threshold.time", timeThreshold); + builder.detail("exchanges.mean-processing-time.threshold.failures", failuresThreshold); + } catch (Exception e) { + } + } + } + + public static final class MaxProcessingTime implements PerformanceCounterEvaluator { + private final long timeThreshold; + private final int failuresThreshold; + + private volatile int failureCount; + + public MaxProcessingTime(long timeThreshold, int failuresThreshold) { + this.timeThreshold = timeThreshold; + this.failuresThreshold = failuresThreshold; + } + + @Override + public void test(ManagedRouteMBean counter, HealthCheckResultBuilder builder, Map options) { + try { + long value = counter.getMaxProcessingTime(); + if (value > timeThreshold) { + failureCount++; + + if (failureCount > failuresThreshold) { + builder.down(); + } + } else { + failureCount = 0; + } + + builder.detail("exchanges.max-processing-time", value); + builder.detail("exchanges.max-processing-time.threshold.time", timeThreshold); + builder.detail("exchanges.max-processing-time.threshold.failures", failuresThreshold); + } catch (Exception e) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/RoutesHealthCheckRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/RoutesHealthCheckRepository.java b/camel-core/src/main/java/org/apache/camel/impl/health/RoutesHealthCheckRepository.java new file mode 100644 index 0000000..637a805 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/RoutesHealthCheckRepository.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Stream; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Route; +import org.apache.camel.api.management.mbean.ManagedRouteMBean; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckRepository; + +public class RoutesHealthCheckRepository implements CamelContextAware, HealthCheckRepository { + private final ConcurrentMap checks; + private Set blacklist; + private List> evaluators; + private ConcurrentMap>> evaluatorMap; + private volatile CamelContext context; + + public RoutesHealthCheckRepository() { + this.checks = new ConcurrentHashMap<>(); + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.context = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return context; + } + + public void setBlacklistedRoutes(Collection blacklistedRoutes) { + blacklistedRoutes.forEach(this::addBlacklistedRoute); + } + + public void addBlacklistedRoute(String routeId) { + if (this.blacklist == null) { + this.blacklist = new HashSet<>(); + } + + this.blacklist.add(routeId); + } + + public void setEvaluators(Collection> evaluators) { + evaluators.forEach(this::addEvaluator); + } + + public void addEvaluator(PerformanceCounterEvaluator evaluator) { + if (this.evaluators == null) { + this.evaluators = new CopyOnWriteArrayList<>(); + } + + this.evaluators.add(evaluator); + } + + public void setRoutesEvaluators(Map>> evaluators) { + evaluators.forEach(this::setRouteEvaluators); + } + + public void setRouteEvaluators(String routeId, Collection> evaluators) { + evaluators.forEach(evaluator -> addRouteEvaluator(routeId, evaluator)); + } + + public void addRouteEvaluator(String routeId, PerformanceCounterEvaluator evaluator) { + if (this.evaluatorMap == null) { + this.evaluatorMap = new ConcurrentHashMap<>(); + } + + this.evaluatorMap.computeIfAbsent(routeId, id -> new CopyOnWriteArrayList<>()).add(evaluator); + } + + public Stream> evaluators() { + return this.evaluators != null + ? this.evaluators.stream() + : Stream.empty(); + } + + public Stream> evaluators(String routeId) { + return this.evaluatorMap != null + ? evaluatorMap.getOrDefault(routeId, Collections.emptyList()).stream() + : Stream.empty(); + } + + @Override + public Stream stream() { + // This is not really efficient as getRoutes() creates a copy of the routes + // array for each invocation. It would be nice to have more stream oriented + // operation on CamelContext i.e. + // + // interface CamelContext { + // + // Stream routes(); + // + // void forEachRoute(Consumer consumer); + // } + // + return this.context != null + ? this.context.getRoutes() + .stream() + .filter(route -> route.getId() != null) + .filter(route -> isNotBlacklisted(route)) + .map(this::toRouteHealthCheck) + : Stream.empty(); + } + + // ***************************** + // Helpers + // ***************************** + + private boolean isNotBlacklisted(Route route) { + return this.blacklist != null + ? !this.blacklist.contains(route.getId()) + : true; + } + + private HealthCheck toRouteHealthCheck(Route route) { + return checks.computeIfAbsent( + route, + r -> { + HealthCheck check = new RouteHealthCheck( + route, + evaluatorMap != null + ? evaluatorMap.getOrDefault(r.getId(), evaluators) + : evaluators + ); + + check.getConfiguration().setEnabled(true); + + return check; + } + ); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/impl/health/package.html ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/health/package.html b/camel-core/src/main/java/org/apache/camel/impl/health/package.html new file mode 100644 index 0000000..e142b7f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/health/package.html @@ -0,0 +1,25 @@ + + + + + + +Camel Health Check implementations. + + + http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java index 3cf6f09..2631bb1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java @@ -221,6 +221,17 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement enlistPreRegisteredServices(); try { + Object me = getManagementObjectStrategy().getManagedObjectForCamelHealth(camelContext); + if (me == null) { + // endpoint should not be managed + return; + } + manageObject(me); + } catch (Exception e) { + LOG.warn("Could not register CamelHealth MBean. This exception will be ignored.", e); + } + + try { Object me = getManagementObjectStrategy().getManagedObjectForRouteController(camelContext); if (me == null) { // endpoint should not be managed @@ -299,6 +310,16 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement } try { + Object mc = getManagementObjectStrategy().getManagedObjectForCamelHealth(context); + // the context could have been removed already + if (getManagementStrategy().isManaged(mc, null)) { + unmanageObject(mc); + } + } catch (Exception e) { + LOG.warn("Could not unregister CamelHealth MBean", e); + } + + try { Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); // the context could have been removed already if (getManagementStrategy().isManaged(mc, null)) { http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java index c78c22b..388f6d6 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementNamingStrategy.java @@ -53,6 +53,7 @@ public class DefaultManagementNamingStrategy implements ManagementNamingStrategy public static final String KEY_CONTEXT = "context"; public static final String TYPE_CONTEXT = "context"; public static final String TYPE_ROUTE_CONTROLLER = "routecontroller"; + public static final String TYPE_HEALTH = "health"; public static final String TYPE_ENDPOINT = "endpoints"; public static final String TYPE_DATAFORMAT = "dataformats"; public static final String TYPE_PROCESSOR = "processors"; @@ -114,6 +115,23 @@ public class DefaultManagementNamingStrategy implements ManagementNamingStrategy } @Override + public ObjectName getObjectNameForCamelHealth(CamelContext context) throws MalformedObjectNameException { + // prefer to use the given management name if previously assigned + String managementName = context.getManagementName(); + if (managementName == null) { + managementName = context.getManagementNameStrategy().getName(); + } + + StringBuilder buffer = new StringBuilder(); + buffer.append(domainName).append(":"); + buffer.append(KEY_CONTEXT + "=").append(getContextId(managementName)).append(","); + buffer.append(KEY_TYPE + "=" + TYPE_HEALTH + ","); + buffer.append(KEY_NAME + "=").append(ObjectName.quote(context.getName())); + + return createObjectName(buffer); + } + + @Override public ObjectName getObjectNameForRouteController(CamelContext context) throws MalformedObjectNameException { // prefer to use the given management name if previously assigned String managementName = context.getManagementName(); http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index 8e785f2..a7f6e34 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -36,6 +36,7 @@ import org.apache.camel.management.mbean.ManagedAggregateProcessor; import org.apache.camel.management.mbean.ManagedBeanProcessor; import org.apache.camel.management.mbean.ManagedBrowsableEndpoint; import org.apache.camel.management.mbean.ManagedCamelContext; +import org.apache.camel.management.mbean.ManagedCamelHealth; import org.apache.camel.management.mbean.ManagedChoice; import org.apache.camel.management.mbean.ManagedCircuitBreakerLoadBalancer; import org.apache.camel.management.mbean.ManagedComponent; @@ -172,6 +173,12 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy return mc; } + public Object getManagedObjectForCamelHealth(CamelContext context) { + ManagedCamelHealth mch = new ManagedCamelHealth(context); + mch.init(context.getManagementStrategy()); + return mch; + } + @SuppressWarnings({"deprecation", "unchecked"}) public Object getManagedObjectForComponent(CamelContext context, Component component, String name) { if (component instanceof org.apache.camel.spi.ManagementAware) { http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java b/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java index 5754a50..fcb16a2 100644 --- a/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/ManagedManagementStrategy.java @@ -23,6 +23,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.management.mbean.ManagedBacklogDebugger; import org.apache.camel.management.mbean.ManagedBacklogTracer; import org.apache.camel.management.mbean.ManagedCamelContext; +import org.apache.camel.management.mbean.ManagedCamelHealth; import org.apache.camel.management.mbean.ManagedComponent; import org.apache.camel.management.mbean.ManagedConsumer; import org.apache.camel.management.mbean.ManagedDataFormat; @@ -87,9 +88,14 @@ public class ManagedManagementStrategy extends DefaultManagementStrategy { ObjectName objectName = null; + + if (managedObject instanceof ManagedCamelContext) { ManagedCamelContext mcc = (ManagedCamelContext) managedObject; objectName = getManagementNamingStrategy().getObjectNameForCamelContext(mcc.getContext()); + } else if (managedObject instanceof ManagedCamelHealth) { + ManagedCamelHealth mch = (ManagedCamelHealth) managedObject; + objectName = getManagementNamingStrategy().getObjectNameForCamelHealth(mch.getContext()); } else if (managedObject instanceof ManagedRouteController) { ManagedRouteController mrc = (ManagedRouteController) managedObject; objectName = getManagementNamingStrategy().getObjectNameForRouteController(mrc.getContext()); http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelHealth.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelHealth.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelHealth.java new file mode 100644 index 0000000..b955abc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelHealth.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; +import org.apache.camel.api.management.mbean.ManagedCamelHealthMBean; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckHelper; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.spi.ManagementStrategy; +import org.apache.camel.util.ObjectHelper; + +public class ManagedCamelHealth implements ManagedCamelHealthMBean { + private final CamelContext context; + + public ManagedCamelHealth(CamelContext context) { + this.context = context; + } + + public void init(ManagementStrategy strategy) { + // do nothing + } + + public CamelContext getContext() { + return context; + } + + @Override + public boolean getIsHealthy() { + for (HealthCheck.Result result: HealthCheckHelper.invoke(context)) { + if (result.getState() == HealthCheck.State.DOWN) { + return false; + } + } + + return true; + } + + @Override + public Collection getHealthChecksIDs() { + HealthCheckRegistry registry = context.getHealthCheckRegistry(); + if (registry != null) { + return registry.getCheckIDs(); + } + + return Collections.emptyList(); + } + + @Override + public TabularData details() { + try { + final TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.camelHealthDetailsTabularType()); + final CompositeType type = CamelOpenMBeanTypes.camelHealthDetailsCompositeType(); + + for (HealthCheck.Result result: HealthCheckHelper.invoke(context)) { + CompositeData data = new CompositeDataSupport( + type, + new String[] { + "id", + "group", + "state", + "enabled", + "interval", + "failureThreshold" + }, + new Object[] { + result.getCheck().getId(), + result.getCheck().getGroup(), + result.getState().name(), + result.getCheck().getConfiguration().isEnabled(), + result.getCheck().getConfiguration().getInterval() != null + ? result.getCheck().getConfiguration().getInterval().toMillis() + : null, + result.getCheck().getConfiguration().getFailureThreshold() + } + ); + + answer.put(data); + } + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + @Override + public String invoke(String id) { + Optional result = HealthCheckHelper.invoke(context, id, Collections.emptyMap()); + + return result.map(r -> r.getState().name()).orElse(HealthCheck.State.UNKNOWN.name()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/spi/GroupAware.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/GroupAware.java b/camel-core/src/main/java/org/apache/camel/spi/GroupAware.java new file mode 100644 index 0000000..d3a2081 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/GroupAware.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +/** + * To allow objects to be injected with an group. + */ +public interface GroupAware extends HasGroup { + + /** + * Sets the group + * + * @param group the group + */ + void setGroup(String group); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/spi/HasCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/HasCamelContext.java b/camel-core/src/main/java/org/apache/camel/spi/HasCamelContext.java new file mode 100644 index 0000000..bef7be4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/HasCamelContext.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.CamelContext; + +/** + * A simple marker interface for an object which holds a CamelContext + */ +public interface HasCamelContext { + + /** + * Returns the camel context. + * + * @return the camel context. + */ + CamelContext getCamelContext(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/spi/HasGroup.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/HasGroup.java b/camel-core/src/main/java/org/apache/camel/spi/HasGroup.java new file mode 100644 index 0000000..3261dd1 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/HasGroup.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +/** + * A simple marker interface for an object which has belongs to a group Group + * which is useful for group related operation such as clustering, JMX style API + */ +public interface HasGroup { + + /** + * Returns the group + * + * @return the group + */ + String getGroup(); +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java index 72bb00a..d5bb88e 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ManagementNamingStrategy.java @@ -42,6 +42,8 @@ public interface ManagementNamingStrategy { ObjectName getObjectNameForCamelContext(String managementName, String name) throws MalformedObjectNameException; + ObjectName getObjectNameForCamelHealth(CamelContext context) throws MalformedObjectNameException; + ObjectName getObjectNameForCamelContext(CamelContext context) throws MalformedObjectNameException; ObjectName getObjectNameForRouteController(CamelContext context) throws MalformedObjectNameException; http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java index a11b2b1..0b06d65 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java @@ -36,6 +36,8 @@ public interface ManagementObjectStrategy { Object getManagedObjectForCamelContext(CamelContext context); + Object getManagedObjectForCamelHealth(CamelContext context); + Object getManagedObjectForComponent(CamelContext context, Component component, String name); Object getManagedObjectForDataFormat(CamelContext context, DataFormat dataFormat); http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java index a1dc3da..4389cc1 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java @@ -54,7 +54,10 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExecutionException; +import org.apache.camel.Component; +import org.apache.camel.ComponentAware; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Ordered; @@ -2070,5 +2073,27 @@ public final class ObjectHelper { } } } + + /** + * Set the {@link CamelContext} context if the component is an instance of {@link CamelContextAware}. + */ + public static T trySetCamelContext(T object, CamelContext camelContext) { + if (object instanceof CamelContextAware) { + ((CamelContextAware) object).setCamelContext(camelContext); + } + + return object; + } + + /** + * Set the {@link Component} context if the component is an instance of {@link ComponentAware}. + */ + public static T trySetComponent(T object, Component component) { + if (object instanceof ComponentAware) { + ((ComponentAware) object).setComponent(component); + } + + return object; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/util/ReferenceCounted.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ReferenceCounted.java b/camel-core/src/main/java/org/apache/camel/util/ReferenceCounted.java new file mode 100644 index 0000000..88516a4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/ReferenceCounted.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +public class ReferenceCounted { + private final ReferenceCount count; + + protected ReferenceCounted() { + this.count = ReferenceCount.onRelease(this::doRelease); + this.count.retain(); + } + + public void retain() throws IllegalStateException { + count.retain(); + } + + public void release() throws IllegalStateException { + count.release(); + } + + // cleanup + protected void doRelease() { + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java index 1ba0165..9acbb96 100644 --- a/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/LockHelper.java @@ -18,8 +18,10 @@ package org.apache.camel.util.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.locks.StampedLock; +import java.util.function.Supplier; import org.apache.camel.util.function.ThrowingRunnable; +import org.apache.camel.util.function.ThrowingSupplier; public final class LockHelper { private LockHelper() { @@ -55,6 +57,26 @@ public final class LockHelper { } } + public static R suppliWithReadLock(StampedLock lock, Supplier task) { + long stamp = lock.readLock(); + + try { + return task.get(); + } finally { + lock.unlockRead(stamp); + } + } + + public static R suppliWithReadLockT(StampedLock lock, ThrowingSupplier task) throws T { + long stamp = lock.readLock(); + + try { + return task.get(); + } finally { + lock.unlockRead(stamp); + } + } + public static void doWithWriteLock(StampedLock lock, Runnable task) { long stamp = lock.writeLock(); @@ -65,7 +87,7 @@ public final class LockHelper { } } - public static R callWithWriteLock(StampedLock lock, Callable task) throws Exception { + public static R callWithWriteLock(StampedLock lock, Callable task) throws Exception { long stamp = lock.writeLock(); try { @@ -75,6 +97,16 @@ public final class LockHelper { } } + public static R suppliWithWriteLock(StampedLock lock, Supplier task) { + long stamp = lock.writeLock(); + + try { + return task.get(); + } finally { + lock.unlockWrite(stamp); + } + } + public static void doWithWriteLockT(StampedLock lock, ThrowingRunnable task) throws T { long stamp = lock.writeLock(); @@ -84,4 +116,14 @@ public final class LockHelper { lock.unlockWrite(stamp); } } + + public static R suppliWithWriteLockT(StampedLock lock, ThrowingSupplier task) throws T { + long stamp = lock.writeLock(); + + try { + return task.get(); + } finally { + lock.unlockWrite(stamp); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingToLongFunction.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/function/ThrowingToLongFunction.java b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingToLongFunction.java new file mode 100644 index 0000000..fb6ab5c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingToLongFunction.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.function; + +@FunctionalInterface +public interface ThrowingToLongFunction { + long apply(I in) throws T; +} + http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckRegistryTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckRegistryTest.java b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckRegistryTest.java new file mode 100644 index 0000000..b49a915 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckRegistryTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckRegistry; +import org.apache.camel.health.HealthCheckResultBuilder; +import org.junit.Assert; +import org.junit.Test; + +public class DefaultHealthCheckRegistryTest { + + @Test + public void testDefaultHealthCheckRegistry() throws Exception { + HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.register(new MyHealthCheck("G1", "1")); + registry.register(new MyHealthCheck("G1", "1")); + registry.register(new MyHealthCheck("G1", "2")); + registry.register(new MyHealthCheck("G2", "3")); + + + List checks = registry.stream().collect(Collectors.toList()); + Assert.assertEquals(3, checks.size()); + + for (HealthCheck check : checks) { + HealthCheck.Result response = check.call(); + + Assert.assertEquals(HealthCheck.State.UP, response.getState()); + Assert.assertFalse(response.getMessage().isPresent()); + Assert.assertFalse(response.getError().isPresent()); + } + } + + @Test + public void testDefaultHealthCheckRegistryWithRepositories() throws Exception { + HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.register(new MyHealthCheck("G1", "1")); + registry.register(new MyHealthCheck("G1", "1")); + registry.register(new MyHealthCheck("G1", "2")); + registry.register(new MyHealthCheck("G2", "3")); + + registry.addRepository(() -> Stream.of( + new MyHealthCheck("G1", "1"), + new MyHealthCheck("G1", "4") + ) + ); + + List checks = registry.stream().collect(Collectors.toList()); + Assert.assertEquals(4, checks.size()); + Assert.assertEquals(1, checks.stream().filter(h -> h.getId().equals("4")).count()); + Assert.assertEquals(3, checks.stream().filter(h -> h.getGroup().equals("G1")).count()); + + for (HealthCheck check : checks) { + HealthCheck.Result response = check.call(); + + Assert.assertEquals(HealthCheck.State.UP, response.getState()); + Assert.assertFalse(response.getMessage().isPresent()); + Assert.assertFalse(response.getError().isPresent()); + } + } + + private class MyHealthCheck extends AbstractHealthCheck { + protected MyHealthCheck(String group, String id) { + super(group, id); + getConfiguration().setEnabled(true); + } + + @Override + public void doCall(HealthCheckResultBuilder builder, Map options) { + builder.up(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckServiceTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckServiceTest.java b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckServiceTest.java new file mode 100644 index 0000000..d7bd42e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/health/DefaultHealthCheckServiceTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckResultBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Assert; +import org.junit.Test; + +public class DefaultHealthCheckServiceTest { + + @Test(timeout = 10000) + public void testDefaultHealthCheckService() throws Exception { + CamelContext context = null; + + try { + MyHealthCheck check = new MyHealthCheck("", HealthCheck.State.UP); + List results = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(10); + + DefaultHealthCheckRegistry registry = new DefaultHealthCheckRegistry(); + registry.register(check); + + DefaultHealthCheckService service = new DefaultHealthCheckService(); + service.setCheckInterval(500, TimeUnit.MILLISECONDS); + service.addStateChangeListener((s, c) -> { + results.add(s); + check.flip(); + latch.countDown(); + }); + + context = new DefaultCamelContext(); + context.setHealthCheckRegistry(registry); + context.addService(service); + context.start(); + + latch.await(); + + for (int i = 0; i < results.size(); i++) { + if (i % 2 == 0) { + Assert.assertEquals(HealthCheck.State.UP, results.get(i)); + } else { + Assert.assertEquals(HealthCheck.State.DOWN, results.get(i)); + } + } + + Assert.assertEquals(1, service.getResults().size()); + Assert.assertEquals(check, service.getResults().iterator().next().getCheck()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + + // ******************************** + // + // ******************************** + + private class MyHealthCheck extends AbstractHealthCheck { + private HealthCheck.State state; + + MyHealthCheck(String id, HealthCheck.State state) { + super(id); + getConfiguration().setEnabled(true); + + this.state = state; + } + + public void flip() { + this.state = this.state == State.UP ? State.DOWN : State.UP; + } + + @Override + public void doCall(HealthCheckResultBuilder builder, Map options) { + builder.state(state); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/test/java/org/apache/camel/impl/health/HealthCheckTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/health/HealthCheckTest.java b/camel-core/src/test/java/org/apache/camel/impl/health/HealthCheckTest.java new file mode 100644 index 0000000..367051e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/health/HealthCheckTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.health; + +import java.time.Duration; +import java.util.Map; + +import org.apache.camel.health.HealthCheck; +import org.apache.camel.health.HealthCheckResultBuilder; +import org.junit.Assert; +import org.junit.Test; + +public class HealthCheckTest { + @Test + public void testCheck() throws Exception { + MyHealthCheck check = new MyHealthCheck(); + check.setState(HealthCheck.State.UP); + + HealthCheck.Result result; + + result = check.call(); + + Assert.assertEquals(HealthCheck.State.UNKNOWN, result.getState()); + Assert.assertTrue(result.getMessage().isPresent()); + Assert.assertEquals("Disabled", result.getMessage().get()); + Assert.assertEquals(false, result.getDetails().get(MyHealthCheck.CHECK_ENABLED)); + + check.getConfiguration().setEnabled(true); + + result = check.call(); + + Assert.assertEquals(HealthCheck.State.UP, result.getState()); + Assert.assertFalse(result.getMessage().isPresent()); + Assert.assertFalse(result.getDetails().containsKey(MyHealthCheck.CHECK_ENABLED)); + } + + @Test + public void testInterval() throws Exception { + MyHealthCheck check = new MyHealthCheck(); + check.setState(HealthCheck.State.UP); + check.getConfiguration().setEnabled(true); + check.getConfiguration().setInterval(Duration.ofMillis(1000)); + + HealthCheck.Result result1 = check.call(); + Assert.assertEquals(HealthCheck.State.UP, result1.getState()); + + Thread.sleep(100); + + HealthCheck.Result result2 = check.call(); + Assert.assertEquals(HealthCheck.State.UP, result2.getState()); + Assert.assertEquals(result1.getDetails().get(MyHealthCheck.INVOCATION_TIME), result2.getDetails().get(MyHealthCheck.INVOCATION_TIME)); + Assert.assertEquals(result1.getDetails().get(MyHealthCheck.INVOCATION_COUNT), result2.getDetails().get(MyHealthCheck.INVOCATION_COUNT)); + Assert.assertNotEquals(check.getMetaData().get(MyHealthCheck.INVOCATION_ATTEMPT_TIME), result2.getDetails().get(MyHealthCheck.INVOCATION_TIME)); + + Thread.sleep(1250); + + HealthCheck.Result result3 = check.call(); + Assert.assertEquals(HealthCheck.State.UP, result3.getState()); + Assert.assertNotEquals(result2.getDetails().get(MyHealthCheck.INVOCATION_TIME), result3.getDetails().get(MyHealthCheck.INVOCATION_TIME)); + Assert.assertNotEquals(result2.getDetails().get(MyHealthCheck.INVOCATION_COUNT), result3.getDetails().get(MyHealthCheck.INVOCATION_COUNT)); + Assert.assertEquals(check.getMetaData().get(MyHealthCheck.INVOCATION_ATTEMPT_TIME), result3.getDetails().get(MyHealthCheck.INVOCATION_TIME)); + } + + @Test + public void testThreshold() throws Exception { + MyHealthCheck check = new MyHealthCheck(); + check.setState(HealthCheck.State.DOWN); + check.getConfiguration().setEnabled(true); + check.getConfiguration().setFailureThreshold(3); + + HealthCheck.Result result; + + for (int i = 0; i < check.getConfiguration().getFailureThreshold(); i++) { + result = check.call(); + + Assert.assertEquals(HealthCheck.State.UP, result.getState()); + Assert.assertEquals(i + 1, result.getDetails().get(MyHealthCheck.INVOCATION_COUNT)); + Assert.assertEquals(i + 1, result.getDetails().get(MyHealthCheck.FAILURE_COUNT)); + } + + Assert.assertEquals(HealthCheck.State.DOWN, check.call().getState()); + } + + @Test + public void testIntervalThreshold() throws Exception { + MyHealthCheck check = new MyHealthCheck(); + check.setState(HealthCheck.State.DOWN); + check.getConfiguration().setEnabled(true); + check.getConfiguration().setInterval(Duration.ofMillis(500)); + check.getConfiguration().setFailureThreshold(3); + + HealthCheck.Result result; + int icount; + int fcount; + + for (int i = 0; i < check.getConfiguration().getFailureThreshold(); i++) { + result = check.call(); + + icount = (int)result.getDetails().get(MyHealthCheck.INVOCATION_COUNT); + fcount = (int)result.getDetails().get(MyHealthCheck.FAILURE_COUNT); + + Assert.assertEquals(HealthCheck.State.UP, result.getState()); + Assert.assertEquals(i + 1, icount); + Assert.assertEquals(i + 1, fcount); + + result = check.call(); + + Assert.assertEquals(HealthCheck.State.UP, result.getState()); + Assert.assertEquals(icount, result.getDetails().get(MyHealthCheck.INVOCATION_COUNT)); + Assert.assertEquals(fcount, result.getDetails().get(MyHealthCheck.FAILURE_COUNT)); + + Thread.sleep(550); + } + + Assert.assertEquals(HealthCheck.State.DOWN, check.call().getState()); + } + + + // ******************************** + // + // ******************************** + + private class MyHealthCheck extends AbstractHealthCheck { + private HealthCheck.State state; + + MyHealthCheck() { + super("my"); + + this.state = HealthCheck.State.UP; + } + + public void setState(State state) { + this.state = state; + } + + public State getState() { + return state; + } + + @Override + public void doCall(HealthCheckResultBuilder builder, Map options) { + builder.state(state); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/00d1d70b/camel-core/src/test/java/org/apache/camel/impl/verifier/DefaultComponentVerifierTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/verifier/DefaultComponentVerifierTest.java b/camel-core/src/test/java/org/apache/camel/impl/verifier/DefaultComponentVerifierTest.java index 9ade841..8c5207b 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/verifier/DefaultComponentVerifierTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/verifier/DefaultComponentVerifierTest.java @@ -39,7 +39,7 @@ public class DefaultComponentVerifierTest extends ContextTestSupport { protected void setUp() throws Exception { super.setUp(); - this.verifier = new DefaultComponentVerifierExtension("timer", context); + this.verifier = new TestVerifier(); } // ************************************* @@ -81,4 +81,10 @@ public class DefaultComponentVerifierTest extends ContextTestSupport { Assert.assertEquals("fixedRate has wrong value (wrong)", result.getErrors().get(0).getDescription()); Assert.assertTrue(result.getErrors().get(0).getParameterKeys().contains("fixedRate")); } + + private class TestVerifier extends DefaultComponentVerifierExtension { + public TestVerifier() { + super("timer", context); + } + } }