camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [08/12] camel git commit: CAMEL-9879: Circuit Breaker EIP - That is using hystrix
Date Tue, 19 Apr 2016 12:02:47 GMT
CAMEL-9879: Circuit Breaker EIP - That is using hystrix


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f0ce65a1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f0ce65a1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f0ce65a1

Branch: refs/heads/hys
Commit: f0ce65a19ec35eb9f6bbf2ff057cedf01352069a
Parents: 97e65ac
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Apr 19 13:13:48 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Apr 19 13:13:48 2016 +0200

----------------------------------------------------------------------
 .../model/HystrixConfigurationDefinition.java   | 641 +++++++++++++++++++
 .../apache/camel/model/HystrixDefinition.java   |  65 +-
 .../resources/org/apache/camel/model/jaxb.index |   1 +
 .../hystrix/processor/HystrixProcessor.java     |   9 +-
 .../processor/HystrixProcessorCommand.java      |   5 +-
 .../processor/HystrixProcessorFactory.java      | 106 ++-
 6 files changed, 808 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
new file mode 100644
index 0000000..5ea084e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixConfigurationDefinition.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.spi.Metadata;
+
+@Metadata(label = "eip,routing,circuitbreaker")
+@XmlRootElement(name = "hystrixConfiguration")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class HystrixConfigurationDefinition {
+
+    @XmlTransient
+    private HystrixDefinition parent;
+
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean circuitBreakerEnabled;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "50")
+    private Integer circuitBreakerErrorThresholdPercentage;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean circuitBreakerForceClosed;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "false")
+    private Boolean circuitBreakerForceOpen;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "20")
+    private Integer circuitBreakerRequestVolumeThreshold;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "5000")
+    private Integer circuitBreakerSleepWindowInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "20")
+    private Integer executionIsolationSemaphoreMaxConcurrentRequests;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "THREAD")
+    private String executionIsolationStrategy;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean executionIsolationThreadInterruptOnTimeout;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "1000")
+    private Integer executionTimeoutInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean executionTimeoutEnabled;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "10")
+    private Integer fallbackIsolationSemaphoreMaxConcurrentRequests;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean fallbackEnabled;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "500")
+    private Integer metricsHealthSnapshotIntervalInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "10")
+    private Integer metricsRollingPercentileBucketSize;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean metricsRollingPercentileEnabled;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "false")
+    private Integer metricsRollingPercentileWindowInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "6")
+    private Integer metricsRollingPercentileWindowBuckets;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "10000")
+    private Integer metricsRollingStatisticalWindowInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "10")
+    private Integer metricsRollingStatisticalWindowBuckets;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean requestCacheEnabled;
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "true")
+    private Boolean requestLogEnabled;
+
+    // thread-pool
+
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "10")
+    private Integer corePoolSize;
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "1")
+    private Integer keepAliveTime;
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "-1")
+    private Integer maxQueueSize;
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "5")
+    private Integer queueSizeRejectionThreshold;
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "10000")
+    private Integer threadPoolRollingNumberStatisticalWindowInMilliseconds;
+    @XmlAttribute
+    @Metadata(label = "threadpool", defaultValue = "10")
+    private Integer threadPoolRollingNumberStatisticalWindowBuckets;
+
+    public HystrixConfigurationDefinition() {
+    }
+
+    public HystrixConfigurationDefinition(HystrixDefinition parent) {
+        this.parent = parent;
+    }
+
+    // Getter/Setter
+    // -------------------------------------------------------------------------
+
+    public Boolean getCircuitBreakerEnabled() {
+        return circuitBreakerEnabled;
+    }
+
+    public void setCircuitBreakerEnabled(Boolean circuitBreakerEnabled) {
+        this.circuitBreakerEnabled = circuitBreakerEnabled;
+    }
+
+    public Integer getCircuitBreakerErrorThresholdPercentage() {
+        return circuitBreakerErrorThresholdPercentage;
+    }
+
+    public void setCircuitBreakerErrorThresholdPercentage(Integer circuitBreakerErrorThresholdPercentage)
{
+        this.circuitBreakerErrorThresholdPercentage = circuitBreakerErrorThresholdPercentage;
+    }
+
+    public Boolean getCircuitBreakerForceClosed() {
+        return circuitBreakerForceClosed;
+    }
+
+    public void setCircuitBreakerForceClosed(Boolean circuitBreakerForceClosed) {
+        this.circuitBreakerForceClosed = circuitBreakerForceClosed;
+    }
+
+    public Boolean getCircuitBreakerForceOpen() {
+        return circuitBreakerForceOpen;
+    }
+
+    public void setCircuitBreakerForceOpen(Boolean circuitBreakerForceOpen) {
+        this.circuitBreakerForceOpen = circuitBreakerForceOpen;
+    }
+
+    public Integer getCircuitBreakerRequestVolumeThreshold() {
+        return circuitBreakerRequestVolumeThreshold;
+    }
+
+    public void setCircuitBreakerRequestVolumeThreshold(Integer circuitBreakerRequestVolumeThreshold)
{
+        this.circuitBreakerRequestVolumeThreshold = circuitBreakerRequestVolumeThreshold;
+    }
+
+    public Integer getCircuitBreakerSleepWindowInMilliseconds() {
+        return circuitBreakerSleepWindowInMilliseconds;
+    }
+
+    public void setCircuitBreakerSleepWindowInMilliseconds(Integer circuitBreakerSleepWindowInMilliseconds)
{
+        this.circuitBreakerSleepWindowInMilliseconds = circuitBreakerSleepWindowInMilliseconds;
+    }
+
+    public Integer getExecutionIsolationSemaphoreMaxConcurrentRequests() {
+        return executionIsolationSemaphoreMaxConcurrentRequests;
+    }
+
+    public void setExecutionIsolationSemaphoreMaxConcurrentRequests(Integer executionIsolationSemaphoreMaxConcurrentRequests)
{
+        this.executionIsolationSemaphoreMaxConcurrentRequests = executionIsolationSemaphoreMaxConcurrentRequests;
+    }
+
+    public String getExecutionIsolationStrategy() {
+        return executionIsolationStrategy;
+    }
+
+    public void setExecutionIsolationStrategy(String executionIsolationStrategy) {
+        this.executionIsolationStrategy = executionIsolationStrategy;
+    }
+
+    public Boolean getExecutionIsolationThreadInterruptOnTimeout() {
+        return executionIsolationThreadInterruptOnTimeout;
+    }
+
+    public void setExecutionIsolationThreadInterruptOnTimeout(Boolean executionIsolationThreadInterruptOnTimeout)
{
+        this.executionIsolationThreadInterruptOnTimeout = executionIsolationThreadInterruptOnTimeout;
+    }
+
+    public Integer getExecutionTimeoutInMilliseconds() {
+        return executionTimeoutInMilliseconds;
+    }
+
+    public void setExecutionTimeoutInMilliseconds(Integer executionTimeoutInMilliseconds)
{
+        this.executionTimeoutInMilliseconds = executionTimeoutInMilliseconds;
+    }
+
+    public Boolean getExecutionTimeoutEnabled() {
+        return executionTimeoutEnabled;
+    }
+
+    public void setExecutionTimeoutEnabled(Boolean executionTimeoutEnabled) {
+        this.executionTimeoutEnabled = executionTimeoutEnabled;
+    }
+
+    public Integer getFallbackIsolationSemaphoreMaxConcurrentRequests() {
+        return fallbackIsolationSemaphoreMaxConcurrentRequests;
+    }
+
+    public void setFallbackIsolationSemaphoreMaxConcurrentRequests(Integer fallbackIsolationSemaphoreMaxConcurrentRequests)
{
+        this.fallbackIsolationSemaphoreMaxConcurrentRequests = fallbackIsolationSemaphoreMaxConcurrentRequests;
+    }
+
+    public Boolean getFallbackEnabled() {
+        return fallbackEnabled;
+    }
+
+    public void setFallbackEnabled(Boolean fallbackEnabled) {
+        this.fallbackEnabled = fallbackEnabled;
+    }
+
+    public Integer getMetricsHealthSnapshotIntervalInMilliseconds() {
+        return metricsHealthSnapshotIntervalInMilliseconds;
+    }
+
+    public void setMetricsHealthSnapshotIntervalInMilliseconds(Integer metricsHealthSnapshotIntervalInMilliseconds)
{
+        this.metricsHealthSnapshotIntervalInMilliseconds = metricsHealthSnapshotIntervalInMilliseconds;
+    }
+
+    public Integer getMetricsRollingPercentileBucketSize() {
+        return metricsRollingPercentileBucketSize;
+    }
+
+    public void setMetricsRollingPercentileBucketSize(Integer metricsRollingPercentileBucketSize)
{
+        this.metricsRollingPercentileBucketSize = metricsRollingPercentileBucketSize;
+    }
+
+    public Boolean getMetricsRollingPercentileEnabled() {
+        return metricsRollingPercentileEnabled;
+    }
+
+    public void setMetricsRollingPercentileEnabled(Boolean metricsRollingPercentileEnabled)
{
+        this.metricsRollingPercentileEnabled = metricsRollingPercentileEnabled;
+    }
+
+    public Integer getMetricsRollingPercentileWindowInMilliseconds() {
+        return metricsRollingPercentileWindowInMilliseconds;
+    }
+
+    public void setMetricsRollingPercentileWindowInMilliseconds(Integer metricsRollingPercentileWindowInMilliseconds)
{
+        this.metricsRollingPercentileWindowInMilliseconds = metricsRollingPercentileWindowInMilliseconds;
+    }
+
+    public Integer getMetricsRollingPercentileWindowBuckets() {
+        return metricsRollingPercentileWindowBuckets;
+    }
+
+    public void setMetricsRollingPercentileWindowBuckets(Integer metricsRollingPercentileWindowBuckets)
{
+        this.metricsRollingPercentileWindowBuckets = metricsRollingPercentileWindowBuckets;
+    }
+
+    public Integer getMetricsRollingStatisticalWindowInMilliseconds() {
+        return metricsRollingStatisticalWindowInMilliseconds;
+    }
+
+    public void setMetricsRollingStatisticalWindowInMilliseconds(Integer metricsRollingStatisticalWindowInMilliseconds)
{
+        this.metricsRollingStatisticalWindowInMilliseconds = metricsRollingStatisticalWindowInMilliseconds;
+    }
+
+    public Integer getMetricsRollingStatisticalWindowBuckets() {
+        return metricsRollingStatisticalWindowBuckets;
+    }
+
+    public void setMetricsRollingStatisticalWindowBuckets(Integer metricsRollingStatisticalWindowBuckets)
{
+        this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
+    }
+
+    public Boolean getRequestCacheEnabled() {
+        return requestCacheEnabled;
+    }
+
+    public void setRequestCacheEnabled(Boolean requestCacheEnabled) {
+        this.requestCacheEnabled = requestCacheEnabled;
+    }
+
+    public Boolean getRequestLogEnabled() {
+        return requestLogEnabled;
+    }
+
+    public void setRequestLogEnabled(Boolean requestLogEnabled) {
+        this.requestLogEnabled = requestLogEnabled;
+    }
+
+    public Integer getCorePoolSize() {
+        return corePoolSize;
+    }
+
+    public void setCorePoolSize(Integer corePoolSize) {
+        this.corePoolSize = corePoolSize;
+    }
+
+    public Integer getKeepAliveTime() {
+        return keepAliveTime;
+    }
+
+    public void setKeepAliveTime(Integer keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public Integer getMaxQueueSize() {
+        return maxQueueSize;
+    }
+
+    public void setMaxQueueSize(Integer maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    public Integer getQueueSizeRejectionThreshold() {
+        return queueSizeRejectionThreshold;
+    }
+
+    public void setQueueSizeRejectionThreshold(Integer queueSizeRejectionThreshold) {
+        this.queueSizeRejectionThreshold = queueSizeRejectionThreshold;
+    }
+
+    public Integer getThreadPoolRollingNumberStatisticalWindowInMilliseconds() {
+        return threadPoolRollingNumberStatisticalWindowInMilliseconds;
+    }
+
+    public void setThreadPoolRollingNumberStatisticalWindowInMilliseconds(Integer threadPoolRollingNumberStatisticalWindowInMilliseconds)
{
+        this.threadPoolRollingNumberStatisticalWindowInMilliseconds = threadPoolRollingNumberStatisticalWindowInMilliseconds;
+    }
+
+    public Integer getThreadPoolRollingNumberStatisticalWindowBuckets() {
+        return threadPoolRollingNumberStatisticalWindowBuckets;
+    }
+
+    public void setThreadPoolRollingNumberStatisticalWindowBuckets(Integer threadPoolRollingNumberStatisticalWindowBuckets)
{
+        this.threadPoolRollingNumberStatisticalWindowBuckets = threadPoolRollingNumberStatisticalWindowBuckets;
+    }
+
+
+    // Fluent API
+    // -------------------------------------------------------------------------
+
+    /**
+     * Whether to use a HystrixCircuitBreaker or not. If false no circuit-breaker logic will
be used and all requests permitted.
+     * <p>
+     * This is similar in effect to circuitBreakerForceClosed() except that continues tracking
metrics and knowing whether it
+     * should be open/closed, this property results in not even instantiating a circuit-breaker.
+     */
+    public HystrixConfigurationDefinition circuitBreakerEnabled(Boolean circuitBreakerEnabled)
{
+        this.circuitBreakerEnabled = circuitBreakerEnabled;
+        return this;
+    }
+
+    /**
+     * Error percentage threshold (as whole number such as 50) at which point the circuit
breaker will trip open and reject requests.
+     * <p>
+     * It will stay tripped for the duration defined in circuitBreakerSleepWindowInMilliseconds;
+     * <p>
+     * The error percentage this is compared against comes from HystrixCommandMetrics.getHealthCounts().
+     */
+    public HystrixConfigurationDefinition circuitBreakerErrorThresholdPercentage(Integer
circuitBreakerErrorThresholdPercentage) {
+        this.circuitBreakerErrorThresholdPercentage = circuitBreakerErrorThresholdPercentage;
+        return this;
+    }
+
+    /**
+     * If true the HystrixCircuitBreaker#allowRequest() will always return true to allow
requests regardless of
+     * the error percentage from HystrixCommandMetrics.getHealthCounts().
+     * <p>
+     * The circuitBreakerForceOpen() property takes precedence so if it set to true this
property does nothing.
+     */
+    public HystrixConfigurationDefinition circuitBreakerForceClosed(Boolean circuitBreakerForceClosed)
{
+        this.circuitBreakerForceClosed = circuitBreakerForceClosed;
+        return this;
+    }
+
+    /**
+     * If true the HystrixCircuitBreaker.allowRequest() will always return false, causing
the circuit to be open (tripped) and reject all requests.
+     * <p>
+     * This property takes precedence over circuitBreakerForceClosed();
+     */
+    public HystrixConfigurationDefinition circuitBreakerForceOpen(Boolean circuitBreakerForceOpen)
{
+        this.circuitBreakerForceOpen = circuitBreakerForceOpen;
+        return this;
+    }
+
+    /**
+     * Minimum number of requests in the metricsRollingStatisticalWindowInMilliseconds()
that must exist before the HystrixCircuitBreaker will trip.
+     * <p>
+     * If below this number the circuit will not trip regardless of error percentage.
+     */
+    public HystrixConfigurationDefinition circuitBreakerRequestVolumeThreshold(Integer circuitBreakerRequestVolumeThreshold)
{
+        this.circuitBreakerRequestVolumeThreshold = circuitBreakerRequestVolumeThreshold;
+        return this;
+    }
+
+    /**
+     * The time in milliseconds after a HystrixCircuitBreaker trips open that it should wait
before trying requests again.
+     */
+    public HystrixConfigurationDefinition circuitBreakerSleepWindowInMilliseconds(Integer
circuitBreakerSleepWindowInMilliseconds) {
+        this.circuitBreakerSleepWindowInMilliseconds = circuitBreakerSleepWindowInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Number of concurrent requests permitted to HystrixCommand.run(). Requests beyond the
concurrent limit will be rejected.
+     * <p>
+     * Applicable only when executionIsolationStrategy == SEMAPHORE.
+     */
+    public HystrixConfigurationDefinition executionIsolationSemaphoreMaxConcurrentRequests(Integer
executionIsolationSemaphoreMaxConcurrentRequests) {
+        this.executionIsolationSemaphoreMaxConcurrentRequests = executionIsolationSemaphoreMaxConcurrentRequests;
+        return this;
+    }
+
+    /**
+     * What isolation strategy HystrixCommand.run() will be executed with.
+     * <p>
+     * If THREAD then it will be executed on a separate thread and concurrent requests limited
by the number of threads in the thread-pool.
+     * <p>
+     * If SEMAPHORE then it will be executed on the calling thread and concurrent requests
limited by the semaphore count.
+     */
+    public HystrixConfigurationDefinition executionIsolationStrategy(String executionIsolationStrategy)
{
+        this.executionIsolationStrategy = executionIsolationStrategy;
+        return this;
+    }
+
+    /**
+     * Whether the execution thread should attempt an interrupt (using {@link Future#cancel})
when a thread times out.
+     * <p>
+     * Applicable only when executionIsolationStrategy() == THREAD.
+     */
+    public HystrixConfigurationDefinition executionIsolationThreadInterruptOnTimeout(Boolean
executionIsolationThreadInterruptOnTimeout) {
+        this.executionIsolationThreadInterruptOnTimeout = executionIsolationThreadInterruptOnTimeout;
+        return this;
+    }
+
+    /**
+     * Time in milliseconds at which point the command will timeout and halt execution.
+     * <p>
+     * If {@link #executionIsolationThreadInterruptOnTimeout} == true and the command is
thread-isolated, the executing thread will be interrupted.
+     * If the command is semaphore-isolated and a HystrixObservableCommand, that command
will get unsubscribed.
+     */
+    public HystrixConfigurationDefinition executionTimeoutInMilliseconds(Integer executionTimeoutInMilliseconds)
{
+        this.executionTimeoutInMilliseconds = executionTimeoutInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Whether the timeout mechanism is enabled for this command
+     */
+    public HystrixConfigurationDefinition executionTimeoutEnabled(Boolean executionTimeoutEnabled)
{
+        this.executionTimeoutEnabled = executionTimeoutEnabled;
+        return this;
+    }
+
+    /**
+     * Number of concurrent requests permitted to HystrixCommand.getFallback().
+     * Requests beyond the concurrent limit will fail-fast and not attempt retrieving a fallback.
+     */
+    public HystrixConfigurationDefinition fallbackIsolationSemaphoreMaxConcurrentRequests(Integer
fallbackIsolationSemaphoreMaxConcurrentRequests) {
+        this.fallbackIsolationSemaphoreMaxConcurrentRequests = fallbackIsolationSemaphoreMaxConcurrentRequests;
+        return this;
+    }
+
+    /**
+     * Whether HystrixCommand.getFallback() should be attempted when failure occurs.
+     */
+    public HystrixConfigurationDefinition fallbackEnabled(Boolean fallbackEnabled) {
+        this.fallbackEnabled = fallbackEnabled;
+        return this;
+    }
+
+    /**
+     * Time in milliseconds to wait between allowing health snapshots to be taken that calculate
success and error
+     * percentages and affect HystrixCircuitBreaker.isOpen() status.
+     * <p>
+     * On high-volume circuits the continual calculation of error percentage can become CPU
intensive thus this controls how often it is calculated.
+     */
+    public HystrixConfigurationDefinition metricsHealthSnapshotIntervalInMilliseconds(Integer
metricsHealthSnapshotIntervalInMilliseconds) {
+        this.metricsHealthSnapshotIntervalInMilliseconds = metricsHealthSnapshotIntervalInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Maximum number of values stored in each bucket of the rolling percentile.
+     * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingPercentileBucketSize(Integer metricsRollingPercentileBucketSize)
{
+        this.metricsRollingPercentileBucketSize = metricsRollingPercentileBucketSize;
+        return this;
+    }
+
+    /**
+     * Whether percentile metrics should be captured using HystrixRollingPercentile inside
HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingPercentileEnabled(Boolean metricsRollingPercentileEnabled)
{
+        this.metricsRollingPercentileEnabled = metricsRollingPercentileEnabled;
+        return this;
+    }
+
+    /**
+     * Duration of percentile rolling window in milliseconds.
+     * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingPercentileWindowInMilliseconds(Integer
metricsRollingPercentileWindowInMilliseconds) {
+        this.metricsRollingPercentileWindowInMilliseconds = metricsRollingPercentileWindowInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Number of buckets the rolling percentile window is broken into.
+     * This is passed into HystrixRollingPercentile inside HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingPercentileWindowBuckets(Integer metricsRollingPercentileWindowBuckets)
{
+        this.metricsRollingPercentileWindowBuckets = metricsRollingPercentileWindowBuckets;
+        return this;
+    }
+
+    /**
+     * Duration of statistical rolling window in milliseconds.
+     * This is passed into HystrixRollingNumber inside HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingStatisticalWindowInMilliseconds(Integer
metricsRollingStatisticalWindowInMilliseconds) {
+        this.metricsRollingStatisticalWindowInMilliseconds = metricsRollingStatisticalWindowInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Number of buckets the rolling statistical window is broken into.
+     * This is passed into HystrixRollingNumber inside HystrixCommandMetrics.
+     */
+    public HystrixConfigurationDefinition metricsRollingStatisticalWindowBuckets(Integer
metricsRollingStatisticalWindowBuckets) {
+        this.metricsRollingStatisticalWindowBuckets = metricsRollingStatisticalWindowBuckets;
+        return this;
+    }
+
+    /**
+     * Whether HystrixCommand.getCacheKey() should be used with HystrixRequestCache
+     * to provide de-duplication functionality via request-scoped caching.
+     */
+    public HystrixConfigurationDefinition requestCacheEnabled(Boolean requestCacheEnabled)
{
+        this.requestCacheEnabled = requestCacheEnabled;
+        return this;
+    }
+
+    /**
+     * Whether HystrixCommand execution and events should be logged to HystrixRequestLog.
+     */
+    public HystrixConfigurationDefinition requestLogEnabled(Boolean requestLogEnabled) {
+        this.requestLogEnabled = requestLogEnabled;
+        return this;
+    }
+
+    /**
+     * Core thread-pool size that gets passed to {@link java.util.concurrent.ThreadPoolExecutor#setCorePoolSize(int)}
+     */
+    public HystrixConfigurationDefinition corePoolSize(Integer corePoolSize) {
+        this.corePoolSize = corePoolSize;
+        return this;
+    }
+
+    /**
+     * Keep-alive time in minutes that gets passed to {@link ThreadPoolExecutor#setKeepAliveTime(long,
TimeUnit)}
+     */
+    public HystrixConfigurationDefinition keepAliveTime(Integer keepAliveTime) {
+        this.keepAliveTime = keepAliveTime;
+        return this;
+    }
+
+    /**
+     * Max queue size that gets passed to {@link BlockingQueue} in HystrixConcurrencyStrategy.getBlockingQueue(int)
+     *
+     * This should only affect the instantiation of a threadpool - it is not eliglible to
change a queue size on the fly.
+     * For that, use queueSizeRejectionThreshold().
+     */
+    public HystrixConfigurationDefinition maxQueueSize(Integer maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+        return this;
+    }
+
+    /**
+     * Queue size rejection threshold is an artificial "max" size at which rejections will
occur even
+     * if {@link #maxQueueSize} has not been reached. This is done because the {@link #maxQueueSize}
+     * of a {@link BlockingQueue} can not be dynamically changed and we want to support dynamically
+     * changing the queue size that affects rejections.
+     * <p>
+     * This is used by HystrixCommand when queuing a thread for execution.
+     */
+    public HystrixConfigurationDefinition queueSizeRejectionThreshold(Integer queueSizeRejectionThreshold)
{
+        this.queueSizeRejectionThreshold = queueSizeRejectionThreshold;
+        return this;
+    }
+
+    /**
+     * Duration of statistical rolling window in milliseconds.
+     * This is passed into HystrixRollingNumber inside each HystrixThreadPoolMetrics instance.
+     */
+    public HystrixConfigurationDefinition threadPoolRollingNumberStatisticalWindowInMilliseconds(Integer
threadPoolRollingNumberStatisticalWindowInMilliseconds) {
+        this.threadPoolRollingNumberStatisticalWindowInMilliseconds = threadPoolRollingNumberStatisticalWindowInMilliseconds;
+        return this;
+    }
+
+    /**
+     * Number of buckets the rolling statistical window is broken into.
+     * This is passed into HystrixRollingNumber inside each HystrixThreadPoolMetrics instance.
+     */
+    public HystrixConfigurationDefinition threadPoolRollingNumberStatisticalWindowBuckets(Integer
threadPoolRollingNumberStatisticalWindowBuckets) {
+        this.threadPoolRollingNumberStatisticalWindowBuckets = threadPoolRollingNumberStatisticalWindowBuckets;
+        return this;
+    }
+
+    /**
+     * End of configuration
+     */
+    public HystrixDefinition end() {
+        return parent;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
index c8abd53..171da30 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
@@ -16,10 +16,13 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementRef;
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.camel.Processor;
@@ -32,6 +35,10 @@ import org.apache.camel.spi.RouteContext;
 public class HystrixDefinition extends OutputDefinition<HystrixDefinition> {
 
     @XmlElement
+    private HystrixConfigurationDefinition hystrixConfiguration;
+    @XmlElementRef
+    protected List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
+    @XmlElement
     private FallbackDefinition fallback;
 
     public HystrixDefinition() {
@@ -52,6 +59,23 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition>
{
         throw new IllegalStateException("Cannot find camel-hystrix on the classpath.");
     }
 
+    public List<ProcessorDefinition<?>> getOutputs() {
+        return outputs;
+    }
+
+    public boolean isOutputSupported() {
+        return true;
+    }
+
+    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
+        this.outputs = outputs;
+        if (outputs != null) {
+            for (ProcessorDefinition<?> output : outputs) {
+                configureChild(output);
+            }
+        }
+    }
+
     @Override
     public void addOutput(ProcessorDefinition<?> output) {
         if (output instanceof FallbackDefinition) {
@@ -74,14 +98,6 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition>
{
         return super.end();
     }
 
-    public FallbackDefinition getFallback() {
-        return fallback;
-    }
-
-    public void setFallback(FallbackDefinition fallback) {
-        this.fallback = fallback;
-    }
-
     protected void preCreateProcessor() {
         // move the fallback from outputs to fallback which we need to ensure
         // such as when using the XML DSL
@@ -95,13 +111,30 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition>
{
         }
     }
 
+    // Getter/Setter
+    // -------------------------------------------------------------------------
+
+    public FallbackDefinition getFallback() {
+        return fallback;
+    }
+
+    public void setFallback(FallbackDefinition fallback) {
+        this.fallback = fallback;
+    }
+
+    public HystrixConfigurationDefinition getHystrixConfiguration() {
+        return hystrixConfiguration;
+    }
+
+    public void setHystrixConfiguration(HystrixConfigurationDefinition hystrixConfiguration)
{
+        this.hystrixConfiguration = hystrixConfiguration;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
     /**
-     * Sets the otherwise node
-     *
-     * @return the builder
+     * Sets the fallback node
      */
     public HystrixDefinition fallback() {
         fallback = new FallbackDefinition();
@@ -109,4 +142,14 @@ public class HystrixDefinition extends OutputDefinition<HystrixDefinition>
{
         return this;
     }
 
+    /**
+     * Configures the Hystrix EIP
+     * <p/>
+     * Use <tt>end</tt> when configuration is complete, to return back to the
Hystrix EIP.
+     */
+    public HystrixConfigurationDefinition configure() {
+        hystrixConfiguration = new HystrixConfigurationDefinition(this);
+        return hystrixConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index 1d80346..9315335 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -32,6 +32,7 @@ FilterDefinition
 FinallyDefinition
 FromDefinition
 HystrixDefinition
+HystrixConfigurationDefinition
 IdempotentConsumerDefinition
 InOnlyDefinition
 InOutDefinition

http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index c3b272d..3400af8 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.hystrix.processor;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommand;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
@@ -36,11 +36,13 @@ import org.apache.camel.util.AsyncProcessorHelper;
 public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>,
org.apache.camel.Traceable, IdAware {
 
     private String id;
+    private final HystrixCommand.Setter setter;
     private final AsyncProcessor processor;
     private final AsyncProcessor fallback;
 
-    public HystrixProcessor(String id, Processor processor, Processor fallback) {
+    public HystrixProcessor(String id, HystrixCommand.Setter setter, Processor processor,
Processor fallback) {
         this.id = id;
+        this.setter = setter;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
         this.fallback = AsyncProcessorConverterHelper.convert(fallback);
     }
@@ -85,8 +87,7 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor,
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        HystrixCommandGroupKey key = HystrixCommandGroupKey.Factory.asKey(id);
-        HystrixProcessorCommand command = new HystrixProcessorCommand(key, exchange, callback,
processor, fallback);
+        HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback,
processor, fallback);
         try {
             command.queue();
         } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 16c0645..c728d6a 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -35,9 +35,8 @@ public class HystrixProcessorCommand extends HystrixCommand<Exchange>
{
     private final AsyncProcessor processor;
     private final AsyncProcessor fallback;
 
-    public HystrixProcessorCommand(HystrixCommandGroupKey group, Exchange exchange, AsyncCallback
callback,
-                                   AsyncProcessor processor, AsyncProcessor fallback) {
-        super(group);
+    public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback,
AsyncProcessor processor, AsyncProcessor fallback) {
+        super(setter);
         this.exchange = exchange;
         this.callback = callback;
         this.processor = processor;

http://git-wip-us.apache.org/repos/asf/camel/blob/f0ce65a1/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
index b6d3a80..8e4a7f1 100644
--- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
+++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
@@ -16,7 +16,12 @@
  */
 package org.apache.camel.component.hystrix.processor;
 
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommandProperties;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
 import org.apache.camel.Processor;
+import org.apache.camel.model.HystrixConfigurationDefinition;
 import org.apache.camel.model.HystrixDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.spi.ProcessorFactory;
@@ -46,7 +51,106 @@ public class HystrixProcessorFactory implements ProcessorFactory {
                 fallback = cb.getFallback().createProcessor(routeContext);
             }
 
-            return new HystrixProcessor(id, processor, fallback);
+            // create setter using the default options
+            HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(id));
+            HystrixCommandProperties.Setter command = HystrixCommandProperties.Setter();
+            setter.andCommandPropertiesDefaults(command);
+            HystrixThreadPoolProperties.Setter threadPool = HystrixThreadPoolProperties.Setter();
+            setter.andThreadPoolPropertiesDefaults(threadPool);
+
+            // any custom configuration then override the setter
+            if (cb.getHystrixConfiguration() != null) {
+                HystrixConfigurationDefinition config = cb.getHystrixConfiguration();
+
+                // command
+                if (config.getCircuitBreakerEnabled() != null) {
+                    command.withCircuitBreakerEnabled(config.getCircuitBreakerEnabled());
+                }
+                if (config.getCircuitBreakerErrorThresholdPercentage() != null) {
+                    command.withCircuitBreakerErrorThresholdPercentage(config.getCircuitBreakerErrorThresholdPercentage());
+                }
+                if (config.getCircuitBreakerForceClosed() != null) {
+                    command.withCircuitBreakerForceClosed(config.getCircuitBreakerForceClosed());
+                }
+                if (config.getCircuitBreakerForceOpen() != null) {
+                    command.withCircuitBreakerForceOpen(config.getCircuitBreakerForceOpen());
+                }
+                if (config.getCircuitBreakerRequestVolumeThreshold() != null) {
+                    command.withCircuitBreakerRequestVolumeThreshold(config.getCircuitBreakerRequestVolumeThreshold());
+                }
+                if (config.getCircuitBreakerSleepWindowInMilliseconds() != null) {
+                    command.withCircuitBreakerSleepWindowInMilliseconds(config.getCircuitBreakerSleepWindowInMilliseconds());
+                }
+                if (config.getExecutionIsolationSemaphoreMaxConcurrentRequests() != null)
{
+                    command.withExecutionIsolationSemaphoreMaxConcurrentRequests(config.getExecutionIsolationSemaphoreMaxConcurrentRequests());
+                }
+                if (config.getExecutionIsolationStrategy() != null) {
+                    command.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.valueOf(config.getExecutionIsolationStrategy()));
+                }
+                if (config.getExecutionIsolationThreadInterruptOnTimeout() != null) {
+                    command.withExecutionIsolationThreadInterruptOnTimeout(config.getExecutionIsolationThreadInterruptOnTimeout());
+                }
+                if (config.getExecutionTimeoutInMilliseconds() != null) {
+                    command.withExecutionTimeoutInMilliseconds(config.getExecutionTimeoutInMilliseconds());
+                }
+                if (config.getExecutionTimeoutEnabled() != null) {
+                    command.withExecutionTimeoutEnabled(config.getExecutionTimeoutEnabled());
+                }
+                if (config.getFallbackIsolationSemaphoreMaxConcurrentRequests() != null)
{
+                    command.withFallbackIsolationSemaphoreMaxConcurrentRequests(config.getFallbackIsolationSemaphoreMaxConcurrentRequests());
+                }
+                if (config.getFallbackEnabled() != null) {
+                    command.withFallbackEnabled(config.getFallbackEnabled());
+                }
+                if (config.getMetricsHealthSnapshotIntervalInMilliseconds() != null) {
+                    command.withMetricsHealthSnapshotIntervalInMilliseconds(config.getMetricsHealthSnapshotIntervalInMilliseconds());
+                }
+                if (config.getMetricsRollingPercentileBucketSize() != null) {
+                    command.withMetricsRollingPercentileBucketSize(config.getMetricsRollingPercentileBucketSize());
+                }
+                if (config.getMetricsRollingPercentileEnabled() != null) {
+                    command.withMetricsRollingPercentileEnabled(config.getMetricsRollingPercentileEnabled());
+                }
+                if (config.getMetricsRollingPercentileWindowInMilliseconds() != null) {
+                    command.withMetricsRollingPercentileWindowInMilliseconds(config.getMetricsRollingPercentileWindowInMilliseconds());
+                }
+                if (config.getMetricsRollingPercentileWindowBuckets() != null) {
+                    command.withMetricsRollingPercentileWindowBuckets(config.getMetricsRollingPercentileWindowBuckets());
+                }
+                if (config.getMetricsRollingStatisticalWindowInMilliseconds() != null) {
+                    command.withMetricsRollingStatisticalWindowInMilliseconds(config.getMetricsRollingStatisticalWindowInMilliseconds());
+                }
+                if (config.getMetricsRollingStatisticalWindowBuckets() != null) {
+                    command.withMetricsRollingStatisticalWindowBuckets(config.getMetricsRollingStatisticalWindowBuckets());
+                }
+                if (config.getRequestCacheEnabled() != null) {
+                    command.withRequestCacheEnabled(config.getRequestCacheEnabled());
+                }
+                if (config.getRequestLogEnabled() != null) {
+                    command.withRequestLogEnabled(config.getRequestLogEnabled());
+                }
+                // thread pool
+                if (config.getCorePoolSize() != null) {
+                    threadPool.withCoreSize(config.getCorePoolSize());
+                }
+                if (config.getKeepAliveTime() != null) {
+                    threadPool.withKeepAliveTimeMinutes(config.getKeepAliveTime());
+                }
+                if (config.getMaxQueueSize() != null) {
+                    threadPool.withMaxQueueSize(config.getMaxQueueSize());
+                }
+                if (config.getQueueSizeRejectionThreshold() != null) {
+                    threadPool.withQueueSizeRejectionThreshold(config.getQueueSizeRejectionThreshold());
+                }
+                if (config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds() !=
null) {
+                    threadPool.withMetricsRollingStatisticalWindowInMilliseconds(config.getThreadPoolRollingNumberStatisticalWindowInMilliseconds());
+                }
+                if (config.getThreadPoolRollingNumberStatisticalWindowBuckets() != null)
{
+                    threadPool.withMetricsRollingStatisticalWindowBuckets(config.getThreadPoolRollingNumberStatisticalWindowBuckets());
+                }
+            }
+
+            return new HystrixProcessor(id, setter, processor, fallback);
         } else {
             return null;
         }


Mime
View raw message