activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Quinn Stevenson <qu...@pronoia-solutions.com>
Subject Re: Multiple virtual topics and messages routing to consumers
Date Wed, 16 Mar 2016 14:03:18 GMT
Looking at your sample, I think your destination name is causing you problems.  The “>”
character is for pattern matching in the configs and wildcard subscribers - I don’t think
wildcard producers are supported.  Can you try it with a different destination name?  Anything
other than “>”.

My sample is Camel route - I’ve pasted the Spring XML file and the unit test for it below.

— begin camel-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation=
               "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
                http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.13.0.xsd">

    <!--
        Create an embedded ActiveMQ Broker
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="embedded-broker"
useJmx="false" persistent="false">
        <!--
        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <virtualTopic name="VirtualTopic.MIGRATION" prefix="Consumer.*.VirtualTopic."/>
                    <virtualTopic name="VirtualTopic.>" prefix="Consumer.*.VirtualTopic."/>
                    <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>
        -->
    </broker>

    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="brokerURL" value="failover:vm://embedded-broker?create=false"/>
    </bean>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="MIGRATION-consumer">
            <from uri="activemq://queue:Consumer.MigrationConsumer.VirtualTopic.MIGRATION"
/>
            <log message="${routeId} received message: ${body}" />
            <to uri="mock://MigrationConsumer" />
        </route>

        <route id="other-consumer-one">
            <from uri="activemq://queue:Consumer.OtherConsumerOne.VirtualTopic.OTHER" />
            <log message="${routeId} received message: ${body}" />
            <to uri="mock://OtherConsumerOne" />
        </route>

        <route id="other-consumer-two">
            <from uri="activemq://queue:Consumer.OtherConsumerTwo.VirtualTopic.OTHER" />
            <log message="${routeId} received message: ${body}" />
            <to uri="mock://OtherConsumerTwo" />
        </route>

        <route id="migration-message-producer">
            <from uri="direct://migration-message-producer" />
            <to uri="activemq://topic:VirtualTopic.MIGRATION" />
        </route>

        <route id="other-message-producer">
            <from uri="direct://other-message-producer" />
            <to uri="activemq://topic:VirtualTopic.OTHER" />
        </route>
    </camelContext>

</beans>
— end camel-context.xml

—begin RouteTest.java
package com.pronoia.camel;

import java.util.concurrent.TimeUnit;

import org.apache.camel.EndpointInject;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class RouteTest extends CamelSpringTestSupport{
    @Produce( uri = "direct://migration-message-producer")
    ProducerTemplate migrationProducer;

    @Produce( uri = "direct://other-message-producer")
    ProducerTemplate otherProducer;

    @EndpointInject( uri = "mock://MigrationConsumer")
    MockEndpoint migrationConsumer;

    @EndpointInject( uri = "mock://OtherConsumerOne")
    MockEndpoint otherConsumerOne;

    @EndpointInject( uri = "mock://OtherConsumerTwo")
    MockEndpoint otherConsumerTwo;

    @Override
    protected AbstractApplicationContext createApplicationContext() {
        return new ClassPathXmlApplicationContext( "META-INF/spring/camel-context.xml");
    }

    @Override
    public boolean isUseAdviceWith() {
        return true;
    }

    @Test
    public void testRoute() throws Exception {
        String migrationMessagePrefix = "Migration Message ";
        String otherMessagePrefix = "Other Message ";

        int migrationMessageCount = 5;
        int otherMessageCount = 10;

        NotifyBuilder notifyMigrationProducer = new NotifyBuilder(context).from("direct://migration-message-producer").whenReceived(migrationMessageCount).create();
        NotifyBuilder notifyOtherProducer = new NotifyBuilder(context).from("direct://other-message-producer").whenReceived(otherMessageCount).create();

        migrationConsumer.expectedMessageCount( migrationMessageCount );
        otherConsumerOne.expectedMessageCount( otherMessageCount );
        otherConsumerTwo.expectedMessageCount( otherMessageCount );

        migrationConsumer.allMessages().body().startsWith(migrationMessagePrefix);
        otherConsumerOne.allMessages().body().startsWith(otherMessagePrefix);
        otherConsumerOne.allMessages().body().startsWith(otherMessagePrefix);

        context.start();

        for( int i=1; i<= migrationMessageCount; ++i ) {
            migrationProducer.sendBody( migrationMessagePrefix + i );
        }

        for( int i=1; i<= otherMessageCount; ++i ) {
            otherProducer.sendBody( otherMessagePrefix + i );
        }

        assertTrue("All migration messages were not sent", notifyMigrationProducer.matches(5,
TimeUnit.SECONDS));
        assertTrue("All other messages were not sent", notifyOtherProducer.matches(5, TimeUnit.SECONDS));

        assertMockEndpointsSatisfied();
    }
}
— end RouteTest.java


> On Mar 15, 2016, at 7:26 PM, Nuno Oliveira <nuno.oliveira@geo-solutions.it> wrote:
> 
> Thank you for the feedback. 
> 
> I have created a simple test case for this based on one of the ActiveMQ basic examples
(attached to this mail - App.java).
> 
> I create two producers, one that will send messages to virtual topic VirtualTopic.>
and another 
> one that will send messages to virtual topic VirtualTopic.MIGRATION.
> 
> And I create two consumers, one that will consume from queue Consumer.ConsumerA.VirtualTopic.>
and another 
> one that will consume from queue Consumer.ConsumerB.VirtualTopic.MIGRATION.
> 
> Sending one message to each virtual topic I would expect something like this:
> 
>    Consumer 'ConsumerA' received: This message was from topic 'VirtualTopic.>'.
>    Consumer 'ConsumerB' received: This message was from topic 'VirtualTopic.MIGRATION'.
> 
> But I get something like this (the messages are replicated to all queues):
> 
>    Consumer 'ConsumerA' received: This message was from topic 'VirtualTopic.>'.
>    Consumer 'ConsumerB' received: This message was from topic 'VirtualTopic.>'.
>    Consumer 'ConsumerA' received: This message was from topic 'VirtualTopic.MIGRATION'.
>    Consumer 'ConsumerB' received: This message was from topic 'VirtualTopic.MIGRATION'.
> 
> I tested this using the default configuration with ActiveMQ versions 5.9.0 and 5.13.2.
> 
> You told that you were able to get this behavior with a sample. I would be very grateful
if you could share that sample :)
> 
> Regards, 
> 
> Le mardi 15 mars 2016 à 13:36 -0600, Quinn Stevenson a écrit :
>> Is there a specific reason the default configuration won’t work for you?  (Either
don’t set anything, or explicitly set the default of
>> <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
>> 
>> I tried a simple sample using the defaults, and it seems to accomplish what you’re
after.
>> 
>>> On Mar 14, 2016, at 11:05 AM, Nuno Oliveira <nuno.oliveira@geo-solutions.it>
wrote:
>>> 
>>> <virtualTopic name="VirtualTopic.MIGRATION" prefix="Consumer.*.VirtualTopic."/>
>>>               <virtualTopic name="VirtualTopic.>" prefix="Consumer.*.VirtualTopic."/>
>> 
> -- 
> ==
> GeoServer Professional Services from the experts! 
> Visit http://goo.gl/it488V for more information.
> ==
> Nuno Miguel Carvalho Oliveira
> @nmcoliveira
> Software Engineer
> 
> GeoSolutions S.A.S.
> Via di Montramito 3/A
> 55054  Massarosa (LU)
> Italy
> 
> phone: +39 0584 962313
> fax:   +39 0584 1660272
> mob:   +39  333 8128928
> 
> http://www.geo-solutions.it
> http://twitter.com/geosolutions_it
> 
> -------------------------------------------------------
> 
> AVVERTENZE AI SENSI DEL D.Lgs. 196/2003
> Le informazioni contenute in questo messaggio di posta elettronica e/o nel/i file/s allegato/i
sono
> da considerarsi strettamente riservate. Il loro utilizzo è consentito esclusivamente
al destinatario del messaggio, per le finalità indicate
> nel messaggio stesso. Qualora riceviate questo messaggio senza esserne il destinatario,
Vi preghiamo cortesemente di darcene notizia via e
> -mail e di procedere alla distruzione del messaggio stesso, cancellandolo dal Vostro
sistema. Conservare il messaggio stesso, divulgarlo
> anche in parte, distribuirlo ad altri soggetti, copiarlo, od utilizzarlo per finalità
diverse, costituisce comportamento contrario ai
> principi dettati dal D.Lgs. 196/2003.
> 
> The information in this message and/or attachments, is intended solely for the attention
and use of
> the named addressee(s) and may be confidential or proprietary in nature or covered by
the provisions of privacy act (Legislative Decree
> June, 30 2003, no.196 - Italy's New Data Protection Code).Any use not in accord with
its purpose, any disclosure, reproduction, copying,
> distribution, or either dissemination, either whole or partial, is strictly forbidden
except previous formal approval of the named
> addressee(s). If you are not the intended recipient, please contact immediately the sender
by telephone, fax or e-mail and delete the
> information in this message that has been received in error. The sender does not give
any warranty or accept liability as the content,
> accuracy or completeness of sent messages and accepts no responsibility  for changes
made after they were sent or for other risks which
> arise as a result of e-mail transmission, viruses, etc.
> <App.java>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message