activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From carlo cancellieri <carlo.cancelli...@geo-solutions.it>
Subject Unable to configure a peer network using xbean
Date Wed, 26 Mar 2014 09:37:06 GMT
Hi all,
 I'm trying to create a cuple of producer consumer using peer:// protocol.
Using the code with the setup 1 (using the inline definition of the peer
broker as simple string) I'm able to communicate between multiple instances
of the same program (just substitute the 'peer1' with 'peerN').
To get a more configurable setup I'm trying to use the xbean protocol to
point to an external xml to define the broker (see the setup2 below).
Now I'm able to send and receive messages from the same instance but trying
to receive a message from the topic using multiple instances fails (the
message is sent back only to the instance which is actually producing the
message).

Where's my error?

The code is essentially the following:
---------------Setup 1---------------------
String brokerUrl="peer://geoserver/peer1";
Topic topic = new ActiveMQTopic("VirtualTopic.>");
ConnectionFactory cf = new PooledConnectionFactory(brokerURL);
Queue queue=new ActiveMQQueue("Consumer.MYConsumer.VirtualTopic.>");
----------------------------------------------

---------------Setup 2---------------------
brokerURL="vm://peer1?brokerConfig=xbean:file:./activemq.xml"
Topic topic = new ActiveMQTopic("VirtualTopic.>");
ConnectionFactory cf = new PooledConnectionFactory(brokerURL);
Queue queue=new ActiveMQQueue("Consumer.MYConsumer.VirtualTopic.>");
----------------------------------------------

The broker definition (very simple)

--------------activemq.xml--------------
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.0.xsd
 http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-contex
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

<broker id="broker" brokerName="broker1" persistent="false"
useJmx="true" xmlns="http://activemq.apache.org/schema/core"
 startAsync="true" >

<managementContext>
<managementContext createConnector="false" />
 </managementContext>

<transportConnectors>
<transportConnector uri="peer://geoserver/peer1" />
 </transportConnectors>
</broker>
</beans>

--------------------------------------------

Here is the Spring+ActiveMQ code in use:

The Temporary queue listener is defined below.

----------------Registering the queue listener---------------
public class JMSContainer extends DefaultMessageListenerContainer
implements DisposableBean {
 ...
    public JMSContainer(JMSConfiguration config, JMSQueueListener listener)
{
        super();

        // force no concurrence
        setConcurrentConsumers(1);

        // the listener used to handle incoming events
        setMessageListener(listener);

        // configuration
        this.config = config;

    }
---------------------------------------------------------------

The following is a method of my producer which leverages on a Spring
JmsTemplate to send messages to the destination.
Note that the JMSEventHandler is an SPI specific for my purposes and is
simply used to serialize/deserialize the incoming (from my app) events to
the Topic. This is also used on the client side to deserialize messages
coming from the temporary queue (see above).

-------------------producing messages------------------
public <S extends Serializable, O> void publish(
            final Topic destination,
            final JmsTemplate jmsTemplate,
            final Properties props,
            final O object) throws JMSException {
        try {

            final JMSEventHandler<S, O> handler =
jmsManager.getHandler(object);

            // set the used SPI
            props.put(JMSEventHandlerSPI.getKeyName(),
handler.getGeneratorClass().getSimpleName());

            // TODO make this configurable
            final MessageCreator creator = new
JMSObjectMessageCreator(handler.serialize(object),props);

            jmsTemplate.send(destination, creator);

        } catch (Exception e) {
            if (LOGGER.isLoggable(java.util.logging.Level.SEVERE)) {
                LOGGER.severe(e.getLocalizedMessage());
            }
            final JMSException ex = new
JMSException(e.getLocalizedMessage());
            ex.initCause(e);
            throw ex;
        }
    }
-----------------------------------------------


Many thanks for your replies,
Cheers,
Carlo

-- 
==
Meet us at GEO Business 2014! in London! Visit http://goo.gl/fES3aK for
more information.
==

Dott. Carlo Cancellieri
@cancellieric
Software Engineer

GeoSolutions S.A.S.
Via Poggio alle Viti 1187
55054  Massarosa (LU)
Italy
phone: +39 0584 962313
fax:   +39 0584 1660272

http://www.geo-solutions.it
http://twitter.com/geosolutions_it

-------------------------------------------------------

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message