activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Quinn Stevenson <qu...@pronoia-solutions.com>
Subject ActiveMQ Broker Camel Component and wildcards
Date Thu, 28 Apr 2016 18:49:04 GMT
I’m trying to use the ActiveMQ Broker Camel Component to add some JMS user properties to
messages as they arrive at the broker.  I’m using a wildcard on the from so I can apply
the same logic to a set of topics or queues, but I can’t seem to send the message back to
the original queue.

Without wildcards, it works fine - something like this:
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
            http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <camelContext id="audit-enrichment" xmlns="http://camel.apache.org/schema/spring">
        <route id="in-audit-to-file">
            <from uri="broker://queue:in.adt.epic"/>
            <setHeader headerName="MyCustomHeader">
                <constant>MyHeaderValue</constant>
            </setHeader>
            <to uri="broker://queue:in.adt.epic" />
        </route>

    </camelContext>

</beans>

However when I put in the wildcards, I get and IllegalStateException.  The configuration I’m
trying looks like this
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
            http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <camelContext id="audit-enrichment" xmlns="http://camel.apache.org/schema/spring">
        <route id="in-audit-to-file">
            <from uri="broker://queue:in.adt.*"/>
            <setHeader headerName="MyCustomHeader">
                <constant>MyHeaderValue</constant>
            </setHeader>
            <recipientList>
                <simple>broker://${header[JMSDestination]}</simple>
            </recipientList>
        </route>

    </camelContext>

</beans>

And here’s the exception I get

2016-04-28 12:47:43,721 | ERROR | Failed delivery for (MessageId: ID-macpro-local-53588-1461869253207-0-2
on ExchangeId: ID-macpro-local-53588-1461869253207-0-3). Exhausted after delivery attempt:
1 caught: java.lang.IllegalStateException: Not the original message from the broker Message:
Enter some text here for the message body...

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                      
                                 Elapsed (ms)
[in-audit-to-file  ] [in-audit-to-file  ] [broker://queue:in.adt.*                       
                               ] [        23]
[in-audit-to-file  ] [setHeader1        ] [setHeader[MyCustomHeader]                     
                               ] [         3]
[in-audit-to-file  ] [recipientList1    ] [recipientList[simple{broker://${header[JMSDestination]}}]
                    ] [        18]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
	Id                  ID-macpro-local-53588-1461869253207-0-3
	ExchangePattern     InOnly
	Headers             {breadcrumbId=ID:macpro.local-53587-1461869252118-4:1:1:1:1, CamelRedelivered=false,
CamelRedeliveryCounter=0, JMSCorrelationID=, JMSCorrelationIDAsBytes=, JMSDeliveryMode=1,
JMSDestination=queue://in.adt.epic, JMSExpiration=0, JMSMessageID=ID:macpro.local-53587-1461869252118-4:1:1:1:1,
JMSPriority=0, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1461869263684, JMSType=,
JMSXGroupID=null, JMSXUserID=null, MyCustomHeader=MyHeaderValue}
	BodyType            String
	Body                Enter some text here for the message body...
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
| org.apache.camel.processor.DefaultErrorHandler | ActiveMQ VMTransport: vm://localhost#1
java.lang.IllegalStateException: Not the original message from the broker Message: Enter some
text here for the message body...
	at org.apache.activemq.camel.component.broker.BrokerProducer.checkOriginalMessage(BrokerProducer.java:95)[activemq-camel-5.13.2.jar:5.13.2]
	at org.apache.activemq.camel.component.broker.BrokerProducer.getMessage(BrokerProducer.java:73)[activemq-camel-5.13.2.jar:5.13.2]
	at org.apache.activemq.camel.component.broker.BrokerProducer.processInOnly(BrokerProducer.java:55)[activemq-camel-5.13.2.jar:5.13.2]
	at org.apache.activemq.camel.component.broker.BrokerProducer.process(BrokerProducer.java:43)[activemq-camel-5.13.2.jar:5.13.2]
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:668)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:596)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:237)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:178)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.RecipientList.process(RecipientList.java:131)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)[camel-core-2.16.2.jar:2.16.2]
	at org.apache.activemq.camel.component.broker.BrokerConsumer.intercept(BrokerConsumer.java:56)[activemq-camel-5.13.2.jar:5.13.2]
	at org.apache.activemq.broker.inteceptor.MessageInterceptorFilter.send(MessageInterceptorFilter.java:109)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:158)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:546)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:768)[activemq-client-5.13.2.jar:5.13.2]
	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)[activemq-client-5.13.2.jar:5.13.2]
	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)[activemq-client-5.13.2.jar:5.13.2]
	at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)[activemq-broker-5.13.2.jar:5.13.2]
	at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:112)[activemq-client-5.13.2.jar:5.13.2]
	at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)[activemq-client-5.13.2.jar:5.13.2]
2016-04-28 12:47:43,727 | WARN  | Error processing intercepted message: ActiveMQTextMessage
{commandId = 5, responseRequired = false, messageId = ID:macpro.local-53587-1461869252118-4:1:1:1:1,
originalDestination = null, originalTransactionId = null, producerId = ID:macpro.local-53587-1461869252118-4:1:1:1,
destination = queue://in.adt.epic, transactionId = null, expiration = 0, timestamp = 1461869263684,
arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = , replyTo = null, persistent
= false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure
= null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
= true, droppable = false, jmsXGroupFirstForConsumer = false, text = Enter some text here
for the message body...}. Exchange[ID-macpro-local-53588-1461869253207-0-1][BrokerJmsMessage[JMSMessageID:
ID:macpro.local-53587-1461869252118-4:1:1:1:1]. Caused by: [java.lang.IllegalStateException
- Not the original message from the broker Message: Enter some text here for the message body...]
| org.apache.activemq.camel.component.broker.BrokerConsumer | ActiveMQ VMTransport: vm://localhost#1

Is there a way to accomplish what I’m after?




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message