I hate to post such a vague question, but I'm totally confused at this
point and could really use some direction.
I have design where I need to have multiple brokers peered with each-
other, as in the "network of brokers" paradigm. The design requires
that I use the "virtual topics" feature to allow event-driven
consumers without multiple identical actions being fired. Also, I'm
using Spring to configure all this as embedded in my application.
To make sure I'm not fighting with anything external, I've put
together a stripped-down mockup of what I expect to need. This results
in the following Spring context:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.1.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
">
<context:mbean-server/>
<context:mbean-export/>
<bean id="propertyConfigurer"
class
=
"org
.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName"
value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
</bean>
<broker id="broker"
brokerName="${system.hostname}"
dataDirectory="${data.directory}"
persistent="true"
useJmx="true"
xmlns="http://activemq.apache.org/schema/core">
<networkConnectors>
<networkConnector name="default-nc" uri="multicast://
default">
<excludedDestinations>
<queue
physicalName="Consumer.A.VirtualTopic.com.dotsub.tcms.TestTopic"/>
<queue
physicalName="Consumer.B.VirtualTopic.com.dotsub.tcms.TestTopic"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="vm://${system.hostname}"
discoveryUri="multicast://default"/>
</transportConnectors>
</broker>
<amq:connectionFactory id="jmsFactory" brokerURL="vm://$
{system.hostname}?create=false"/>
<amq:topic id="testVTopic"
physicalName="VirtualTopic.com.dotsub.tcms.TestTopic"/>
<bean id="topicTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsFactory"/>
</bean>
</property>
<property name="defaultDestination" ref="testVTopic"/>
</bean>
<bean id="randomMessageProducerA"
class="com.dotsub.jmstest.RandomMessageProducer" init-method="start"
destroy-method="stop" depends-on="broker">
<property name="jmsTemplate" ref="topicTemplate"/>
<property name="maximumDelay" value="18"/>
<property name="minimumDelay" value="3"/>
<property name="label" value="${system.hostname}-A"/>
</bean>
<bean id="dumbConsumerA" class="com.dotsub.jmstest.DumbConsumer">
<property name="delay" value="500"/>
</bean>
<bean id="dumbConsumerB" class="com.dotsub.jmstest.DumbConsumer">
<property name="delay" value="11000"/>
</bean>
<jms:listener-container connection-factory="jmsFactory"
concurrency="1" acknowledge="transacted">
<jms:listener
destination="Consumer.A.VirtualTopic.com.dotsub.tcms.TestTopic"
ref="dumbConsumerA"
method="consumeMessage"/>
<jms:listener
destination="Consumer.B.VirtualTopic.com.dotsub.tcms.TestTopic"
ref="dumbConsumerB"
method="consumeMessage"/>
</jms:listener-container>
</beans>
What I am expecting to have happen is:
- a single broker is instantiated with an in-VM transport transport
and a multicast network transport
- My test producer sends messages to the virtual topic
- My pair of test consumers consume from the queues created against
the above virtual topic
My testing shows that this seems to be working the way I expect.
However, I get a ton of exceptions in my logs:
14:05:41 DEBUG [Persistence Adaptor Task] amq.AMQPersistenceAdapter|
Checkpoint started.
14:05:41 DEBUG [Persistence Adaptor Task] amq.AMQPersistenceAdapter|
Marking journal at: offset = 42719, file = 1, size = 337, type = 1
14:05:41 DEBUG [Persistence Adaptor Task] amq.AMQPersistenceAdapter|
Checkpoint done.
14:05:49 DEBUG
[org.springframework.jms.listener.DefaultMessageListenerContainer#0-1]
activemq.ActiveMQSession| ID:plank.local-53338-1220464601759-2:1:1
Transaction Commit
14:05:49 ERROR [VMTransport] TransportConnection.Service| Async error
occurred: java.lang.IllegalArgumentException: The subscription does
not exist: ID:plank.local-53338-1220464601759-2:1:1:1
java.lang.IllegalArgumentException: The subscription does not exist:
ID:plank.local-53338-1220464601759-2:1:1:1
Is this normal or a result of my configuration?
Secondly, if I fire up a second instance of this (in a second VM),
with the test producer *disabled*; the consumers in this second VM do
not get any messages. I guess this sort of makes sense. I would really
like all free consumers to get messages though, wether they are in the
same node or not. Am I totally confused?
I can provide my full test environment, which is a maven build, if
that helps.
Thanks,
A.
--
www.sherman.ca / +1 613 797 6819
"When the burning husks of your startups warm the last of your bones,
remember I told you so." - Zed
|