Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBAAC180AC for ; Tue, 21 Jul 2015 15:11:36 +0000 (UTC) Received: (qmail 89707 invoked by uid 500); 21 Jul 2015 15:11:02 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 89660 invoked by uid 500); 21 Jul 2015 15:11:02 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 89635 invoked by uid 99); 21 Jul 2015 15:11:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jul 2015 15:11:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 235C5E0283; Tue, 21 Jul 2015 15:11:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 21 Jul 2015 15:11:02 -0000 Message-Id: <1eb7ff7178674cd291ddf4bffe3c5ff0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] camel git commit: CAMEL-8526: Add more EIP as specialized mbeans Repository: camel Updated Branches: refs/heads/master 4de78cea6 -> edb9fd303 CAMEL-8526: Add more EIP as specialized mbeans Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f3d9749 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f3d9749 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f3d9749 Branch: refs/heads/master Commit: 3f3d974956fefe0ef163d49ce3f1d19f43af2e2b Parents: 4de78ce Author: Claus Ibsen Authored: Tue Jul 21 16:59:47 2015 +0200 Committer: Claus Ibsen Committed: Tue Jul 21 16:59:47 2015 +0200 ---------------------------------------------------------------------- .../mbean/ManagedResequencerMBean.java | 47 ++++++++ .../DefaultManagementObjectStrategy.java | 7 ++ .../management/mbean/ManagedResequencer.java | 116 +++++++++++++++++++ .../camel/model/ConvertBodyDefinition.java | 2 - .../org/apache/camel/model/ExpressionNode.java | 2 - .../org/apache/camel/model/FromDefinition.java | 4 +- .../apache/camel/model/ProcessDefinition.java | 2 - .../camel/model/ResequenceDefinition.java | 8 +- .../org/apache/camel/model/SendDefinition.java | 5 +- .../apache/camel/model/SetHeaderDefinition.java | 2 - .../camel/model/SetOutHeaderDefinition.java | 3 - .../camel/model/SetPropertyDefinition.java | 2 - .../model/language/ExpressionDefinition.java | 4 +- .../apache/camel/processor/BatchProcessor.java | 24 ++++ .../camel/processor/StreamResequencer.java | 13 ++- .../management/ManagedResequencerTest.java | 94 +++++++++++++++ 16 files changed, 308 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java new file mode 100644 index 0000000..8eb9b4f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedResequencerMBean.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.api.management.mbean; + +import org.apache.camel.api.management.ManagedAttribute; + +public interface ManagedResequencerMBean extends ManagedProcessorMBean { + + @ManagedAttribute(description = "Expression to use for re-ordering the messages, such as a header with a sequence number") + String getExpression(); + + @ManagedAttribute(description = "The size of the batch to be re-ordered. The default size is 100.") + Integer getBatchSize(); + + @ManagedAttribute(description = "Minimum time to wait for missing elements (messages).") + Long getTimeout(); + + @ManagedAttribute(description = "Whether to allow duplicates.") + Boolean isAllowDuplicates(); + + @ManagedAttribute(description = "Whether to reverse the ordering.") + Boolean isReverse(); + + @ManagedAttribute(description = "Whether to ignore invalid exchanges") + Boolean isIgnoreInvalidExchanges(); + + @ManagedAttribute(description = "The capacity of the resequencer's inbound queue") + Integer getCapacity(); + + @ManagedAttribute(description = "If true, throws an exception when messages older than the last delivered message are processed") + Boolean isRejectOld(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java index b00f746..661f914 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java @@ -52,6 +52,7 @@ import org.apache.camel.management.mbean.ManagedPollEnricher; import org.apache.camel.management.mbean.ManagedProcessor; import org.apache.camel.management.mbean.ManagedProducer; import org.apache.camel.management.mbean.ManagedRecipientList; +import org.apache.camel.management.mbean.ManagedResequencer; import org.apache.camel.management.mbean.ManagedRoute; import org.apache.camel.management.mbean.ManagedRoutingSlip; import org.apache.camel.management.mbean.ManagedScheduledPollConsumer; @@ -77,9 +78,11 @@ import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.PollEnricher; import org.apache.camel.processor.RecipientList; +import org.apache.camel.processor.Resequencer; import org.apache.camel.processor.RoutingSlip; import org.apache.camel.processor.SendDynamicProcessor; import org.apache.camel.processor.SendProcessor; +import org.apache.camel.processor.StreamResequencer; import org.apache.camel.processor.Throttler; import org.apache.camel.processor.ThroughputLogger; import org.apache.camel.processor.WireTapProcessor; @@ -226,6 +229,10 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy answer = new ManagedRecipientList(context, (RecipientList) target, definition); } else if (target instanceof MulticastProcessor) { answer = new ManagedMulticast(context, (MulticastProcessor) target, definition); + } else if (target instanceof Resequencer) { + answer = new ManagedResequencer(context, (Resequencer) target, definition); + } else if (target instanceof StreamResequencer) { + answer = new ManagedResequencer(context, (StreamResequencer) target, definition); } else if (target instanceof WireTapProcessor) { answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition); } else if (target instanceof SendDynamicProcessor) { http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java new file mode 100644 index 0000000..a3429cc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedResequencer.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management.mbean; + +import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.api.management.mbean.ManagedResequencerMBean; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.processor.Resequencer; +import org.apache.camel.processor.StreamResequencer; + +/** + * @version + */ +@ManagedResource(description = "Managed Resequencer") +public class ManagedResequencer extends ManagedProcessor implements ManagedResequencerMBean { + private final Resequencer processor; + private final StreamResequencer streamProcessor; + private final String expression; + + public ManagedResequencer(CamelContext context, Resequencer processor, ProcessorDefinition definition) { + super(context, processor, definition); + this.processor = processor; + this.streamProcessor = null; + this.expression = processor.getExpression().toString(); + } + + public ManagedResequencer(CamelContext context, StreamResequencer processor, ProcessorDefinition definition) { + super(context, processor, definition); + this.processor = null; + this.streamProcessor = processor; + this.expression = streamProcessor.getExpression().toString(); + } + + @Override + public String getExpression() { + return expression; + } + + @Override + public Integer getBatchSize() { + if (processor != null) { + return processor.getBatchSize(); + } else { + return null; + } + } + + @Override + public Long getTimeout() { + if (processor != null) { + return processor.getBatchTimeout(); + } else { + return streamProcessor.getTimeout(); + } + } + + @Override + public Boolean isAllowDuplicates() { + if (processor != null) { + return processor.isAllowDuplicates(); + } else { + return null; + } + } + + @Override + public Boolean isReverse() { + if (processor != null) { + return processor.isReverse(); + } else { + return null; + } + } + + @Override + public Boolean isIgnoreInvalidExchanges() { + if (processor != null) { + return processor.isIgnoreInvalidExchanges(); + } else { + return streamProcessor.isIgnoreInvalidExchanges(); + } + } + + @Override + public Integer getCapacity() { + if (processor != null) { + return null; + } else { + return streamProcessor.getCapacity(); + } + } + + @Override + public Boolean isRejectOld() { + if (processor != null) { + return null; + } else { + return streamProcessor.isRejectOld(); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java index a2fbac8..402790e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ConvertBodyDefinition.java @@ -27,7 +27,6 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Processor; import org.apache.camel.processor.ConvertBodyProcessor; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; /** @@ -103,7 +102,6 @@ public class ConvertBodyDefinition extends NoOutputDefinition return expression; } - @Required public void setExpression(ExpressionDefinition expression) { this.expression = expression; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java index 3c1fc1f..bbb8428 100644 --- a/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/FromDefinition.java @@ -24,7 +24,6 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Endpoint; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -37,7 +36,7 @@ import org.apache.camel.util.ObjectHelper; @XmlRootElement(name = "from") @XmlAccessorType(XmlAccessType.FIELD) public class FromDefinition extends OptionalIdentifiedDefinition implements EndpointRequiredDefinition { - @XmlAttribute + @XmlAttribute @Metadata(required = "true") private String uri; @XmlAttribute @Deprecated @@ -96,7 +95,6 @@ public class FromDefinition extends OptionalIdentifiedDefinition * * @param uri the endpoint URI to use */ - @Required public void setUri(String uri) { clear(); this.uri = uri; http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java index b5f9f44..7d2b818 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessDefinition.java @@ -28,7 +28,6 @@ import org.apache.camel.Service; import org.apache.camel.processor.DelegateAsyncProcessor; import org.apache.camel.processor.DelegateSyncProcessor; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -84,7 +83,6 @@ public class ProcessDefinition extends NoOutputDefinition { /** * Reference to the {@link Processor} to lookup in the registry to use. */ - @Required public void setRef(String ref) { this.ref = ref; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java index f74abee..7f26a4f 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java @@ -37,7 +37,6 @@ import org.apache.camel.processor.Resequencer; import org.apache.camel.processor.StreamResequencer; import org.apache.camel.processor.resequencer.ExpressionResultComparator; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ObjectHelper; @@ -61,8 +60,7 @@ public class ResequenceDefinition extends ProcessorDefinition> outputs = new ArrayList>(); @@ -364,6 +362,8 @@ public class ResequenceDefinition extends ProcessorDefinition> extends NoOutputDefinition implements EndpointRequiredDefinition { - @XmlAttribute + @XmlAttribute @Metadata(required = "true") protected String uri; @XmlAttribute @Deprecated @@ -99,7 +99,6 @@ public abstract class SendDefinition> ext * * @param uri the uri of the endpoint */ - @Required public void setUri(String uri) { this.uri = uri; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java index 13ebb47..6d1f794e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SetHeaderDefinition.java @@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.SetHeaderProcessor; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -88,7 +87,6 @@ public class SetHeaderDefinition extends NoOutputExpressionNode { /** * Name of message header to set a new value */ - @Required public void setHeaderName(String headerName) { this.headerName = headerName; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java index dc405f1..5493f08 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SetOutHeaderDefinition.java @@ -26,7 +26,6 @@ import org.apache.camel.Processor; import org.apache.camel.builder.ProcessorBuilder; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -40,7 +39,6 @@ import org.apache.camel.util.ObjectHelper; @XmlAccessorType(XmlAccessType.FIELD) @Deprecated public class SetOutHeaderDefinition extends NoOutputExpressionNode { - @Deprecated @XmlAttribute(required = true) private String headerName; @@ -86,7 +84,6 @@ public class SetOutHeaderDefinition extends NoOutputExpressionNode { /** * Name of message header to set a new value */ - @Required public void setHeaderName(String headerName) { this.headerName = headerName; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java index eccb3d2..a0c1376 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SetPropertyDefinition.java @@ -27,7 +27,6 @@ import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.SetPropertyProcessor; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; @@ -88,7 +87,6 @@ public class SetPropertyDefinition extends NoOutputExpressionNode { /** * Name of exchange property to set a new value */ - @Required public void setPropertyName(String propertyName) { this.propertyName = propertyName; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java index bd3f4e9..cf8b1a5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/language/ExpressionDefinition.java @@ -37,7 +37,6 @@ import org.apache.camel.Predicate; import org.apache.camel.model.OtherAttributesAware; import org.apache.camel.spi.Language; import org.apache.camel.spi.Metadata; -import org.apache.camel.spi.Required; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.CollectionStringBuffer; import org.apache.camel.util.ExpressionToPredicateAdapter; @@ -56,7 +55,7 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu @XmlAttribute @XmlID private String id; - @XmlValue + @XmlValue @Metadata(required = "true") private String expression; @XmlAttribute @Metadata(defaultValue = "true") private Boolean trim; @@ -209,7 +208,6 @@ public class ExpressionDefinition implements Expression, Predicate, OtherAttribu /** * The expression value in your chosen language syntax */ - @Required public void setExpression(String expression) { this.expression = expression; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java index b3c8f05..797c89f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java @@ -68,6 +68,8 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na private boolean groupExchanges; private boolean batchConsumer; private boolean ignoreInvalidExchanges; + private boolean reverse; + private boolean allowDuplicates; private Predicate completionPredicate; private Expression expression; @@ -100,6 +102,12 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na // Properties // ------------------------------------------------------------------------- + + + public Expression getExpression() { + return expression; + } + public ExceptionHandler getExceptionHandler() { return exceptionHandler; } @@ -176,6 +184,22 @@ public class BatchProcessor extends ServiceSupport implements AsyncProcessor, Na this.ignoreInvalidExchanges = ignoreInvalidExchanges; } + public boolean isReverse() { + return reverse; + } + + public void setReverse(boolean reverse) { + this.reverse = reverse; + } + + public boolean isAllowDuplicates() { + return allowDuplicates; + } + + public void setAllowDuplicates(boolean allowDuplicates) { + this.allowDuplicates = allowDuplicates; + } + public Predicate getCompletionPredicate() { return completionPredicate; } http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java index 71af68f..e24b056 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java @@ -28,6 +28,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; +import org.apache.camel.Expression; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Traceable; @@ -76,6 +77,7 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< private final ExceptionHandler exceptionHandler; private final ResequencerEngine engine; private final Processor processor; + private final Expression expression; private Delivery delivery; private int capacity; private boolean ignoreInvalidExchanges; @@ -86,15 +88,20 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< * @param processor next processor that processes re-ordered exchanges. * @param comparator a sequence element comparator for exchanges. */ - public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator comparator) { + public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator comparator, Expression expression) { ObjectHelper.notNull(camelContext, "CamelContext"); this.camelContext = camelContext; this.engine = new ResequencerEngine(comparator); this.engine.setSequenceSender(this); this.processor = processor; + this.expression = expression; this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); } + public Expression getExpression() { + return expression; + } + /** * Returns this resequencer's exception handler. */ @@ -150,6 +157,10 @@ public class StreamResequencer extends ServiceSupport implements SequenceSender< engine.setRejectOld(rejectOld); } + public boolean isRejectOld() { + return engine.getRejectOld() != null && engine.getRejectOld(); + } + /** * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer. *

http://git-wip-us.apache.org/repos/asf/camel/blob/3f3d9749/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java new file mode 100644 index 0000000..8d5ce62 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedResequencerTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedResequencerTest extends ManagementTestSupport { + + public void testManageResequencer() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MockEndpoint foo = getMockEndpoint("mock:foo"); + foo.expectedBodiesReceived("A", "B", "C"); + + template.sendBodyAndHeader("direct:start", "B", "num", "2"); + template.sendBodyAndHeader("direct:start", "C", "num", "3"); + template.sendBodyAndHeader("direct:start", "A", "num", "1"); + + assertMockEndpointsSatisfied(); + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + // get the object name for the delayer + ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\""); + + // should be on route1 + String routeId = (String) mbeanServer.getAttribute(on, "RouteId"); + assertEquals("route1", routeId); + + String camelId = (String) mbeanServer.getAttribute(on, "CamelId"); + assertEquals("camel-1", camelId); + + String state = (String) mbeanServer.getAttribute(on, "State"); + assertEquals(ServiceStatus.Started.name(), state); + + String uri = (String) mbeanServer.getAttribute(on, "Expression"); + assertEquals("header(num)", uri); + + Integer size = (Integer) mbeanServer.getAttribute(on, "BatchSize"); + assertEquals(3, size.intValue()); + + TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(2, data.size()); + + data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); + assertNotNull(data); + assertEquals(5, data.size()); + + String json = (String) mbeanServer.invoke(on, "informationJson", null, null); + assertNotNull(json); + assertTrue(json.contains("\"description\": \"Resequences (re-order) messages based on an expression")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .resequence(header("num")).size(3).id("mysend") + .to("mock:foo"); + } + }; + } + +}