activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Harry Co" <harryco.ma...@gmail.com>
Subject ActiveMQ 4.0.2 and JMSXGroupID. Is the functionnality broken?
Date Thu, 19 Apr 2007 10:51:42 GMT
Hi all,



I'm just trying to evaluate the JMSXGroupID functionality in ActiveMQ 4.0.2.

But seems to not been working as described on activeMQ page (
http://activemq.apache.org/message-groups.html).
This page says :

 When a message is being dispatched to a consumer, the JMSXGroupID is
> checked. If one is present then the broker checks to see if a consumer owns
> that message group. (Since there could be a massive number of individual
> message groups we use hash buckets rather than the actual JMSXGroupID
> string).
>
> If no consumer is associated with a message group a consumer is chosen.
> That JMS MessageConsumer will receive all further messages with the same
> JMSXGroupID value until
>
>    - the consumer closes (or the client which created the consumer dies
>    etc)
>    - someone closes the message group by sending a message with a
>    JMSXGroupSeq value of zero (see below for more details)
>
> So I assume (*correct me I'm wrong*):

   - if I have two consumers on the same topic, only one consumer should
   receive messages published on that topic at any time
      - This does not seem to work. In the following example, when 2
      consumers are started, both receive all messages on the topic.
   - I don't need to define selector on my consumers to get one focus on
   a particular group.
      - This seems to be broken also. If I don't define a selector on
      consumers they all receive all messages from the topic.

*Any help appreciated.*



TIA



Harry,



*package* com.test;



*import* javax.jms.Connection;

*import* javax.jms.DeliveryMode;

*import* javax.jms.Destination;

*import* javax.jms.JMSException;

*import* javax.jms.MessageProducer;

*import* javax.jms.Session;

*import* javax.jms.Topic;



*import* org.apache.activemq.ActiveMQConnectionFactory;

*import* org.apache.activemq.command.ActiveMQTextMessage;



*public* *class* GreetingCardFromHolidays {



       *private* String where;



       *private* *int* holidaysDelay;



       Connection connection;



       *private* Session session;



       *private* Topic destination;



       *private* MessageProducer producer;



       *private* *static* *class* CardSender *implements* Runnable {

             *private* GreetingCardFromHolidays from;



             CardSender(GreetingCardFromHolidays from) {

                    *this*.from = from;

             }



             *public* *void* run() {

                    *for* (; *this*.from.holidaysDelay > 0; *this*.from.
holidaysDelay--) {

                           *this*.from.send();

                           *try* {

                                  Thread.*sleep*(2000);

                           } *catch* (InterruptedException e) {

                                  e.printStackTrace();

                           } *finally* {

                                  // Thread.currentThread().notifyAll();

                           }

                    }



             }

       }



       GreetingCardFromHolidays(String where, *int* holidaysDelay) {

             *this*.where = where;

             *this*.holidaysDelay = holidaysDelay;

             *this*.init();

       }


       *public* *static* *void* main(String[] args) {

             *final* GreetingCardFromHolidays tanganika =
*new*GreetingCardFromHolidays(

                           "Tanganika", 2 * 7);

             *final* GreetingCardFromHolidays victoria =
*new*GreetingCardFromHolidays(

                           "Victoria", 4 * 7);

             *new* Thread(*new* CardSender(tanganika), tanganika.where
).start();

             *new* Thread(*new* CardSender(victoria), victoria.where
).start();

       }



       *private* *void* send() {

             *try* {

                    // Create a Session

                    Session session = connection.createSession(*false*,

                                  Session.*AUTO_ACKNOWLEDGE*);



                    // Create the destination (Topic or Queue)

                    Destination destination = session.createTopic("
GONE.FISHING");



                    // Create a MessageProducer from the Session to the
Topic or Queue

                    MessageProducer producer = session.createProducer
(destination);

                    // producer.setTimeToLive(10000);

                    producer.setDeliveryMode(DeliveryMode.*PERSISTENT*);



                    // Create a message

                    String text = "Hello my friend! From [" + where

                                  + "]. I've gone fishing since [" + *this*.
holidaysDelay

                                  + "] days.";

                    ActiveMQTextMessage message = (ActiveMQTextMessage) *
this*.session

                                  .createTextMessage(text);

                    message.setStringProperty("JMSXGroupID", *this*.where);

                    // message.setStringProperty("where",where);



                    // Tell the producer to send the message

                    *this*.producer.send(message);

                    System.*out*.println(*this* + "Sending card :[" + text +
"]. GroupID ["

                                  + message.getGroupID() + "]");



             } *catch* (Exception e) {

                    System.*out*.println("Caught: " + e);

                    e.printStackTrace();

             }

       }



       *private* *void* init() {

             // Create a ConnectionFactory

             ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(

                           "tcp://localhost:61616");



             // Create a Connection

             *try* {

                    *this*.connection = connectionFactory.createConnection(*
this*.where,

                                  *this*.where);

                    connection.setClientID(*this*.where);

                    connection.start();

                    // Create a Session

                    *this*.session = connection.createSession(*false*,

                                  Session.*AUTO_ACKNOWLEDGE*);



                    // Create the destination (Topic or Queue)

                    *this*.destination = session.createTopic("GONE.FISHING"
);



                    // Create a MessageProducer from the Session to the
Topic or Queue

                    *this*.producer = session.createProducer(destination);

                    // producer.setTimeToLive(10000);

                    *this*.producer.setDeliveryMode(DeliveryMode.*PERSISTENT
*);



             } *catch* (JMSException e) {

                    e.printStackTrace();

             }



       }



       *protected* *void* finalize() *throws* Throwable {

             System.*out*.println("Finalizing [" + *this* + "]");

             // Clean up

             *if* (*this*.session != *null*)

                    session.close();

             *if* (*this*.connection != *null*)

                    *this*.connection.close();

             *super*.finalize();

       }



       *public* String toString() {

             *return* "On holiday :: " + *this*.where;

       }

}





*package* com.test;



*import* javax.jms.Connection;

*import* javax.jms.Destination;

*import* javax.jms.JMSException;

*import* javax.jms.Message;

*import* javax.jms.MessageConsumer;

*import* javax.jms.MessageListener;

*import* javax.jms.Session;

*import* javax.jms.TextMessage;



*import* org.apache.activemq.ActiveMQConnectionFactory;



*public* *class* GreetingsCardWaiter {

       *private* String where;

       Connection connection;

       Session session;

       *private* *static* *class* CardReceiver *implements* Runnable{

             *private* GreetingsCardWaiter from;

             CardReceiver(GreetingsCardWaiter from){

                    *this*.from=from;

             }

             *public* *void* run(){

                    *this*.from.waitForAcard();

             }

       }

       GreetingsCardWaiter(String where){

             *this*.where = where;

             *this*.init();

       }

       *public* *static* *void* main(String[] args) {

             *new* Thread(*new*
CardReceiver(*new*GreetingsCardWaiter(args[0])), args[0]).start();

       }

       *private* *void* waitForAcard(){

             *try* {

                    // Create the destination (Topic or Queue)

                    Destination destination = session.createTopic("
GONE.FISHING");



                    // Create a MessageProducer from the Session to the
Topic or Queue

                    MessageConsumer consumer = session
.createConsumer(destination);

                    *final* String receiver = *this*.toString();

                    consumer.setMessageListener(*new* MessageListener(){

                           *public* *void* onMessage(Message arg0) {

                                  *try* {

                                        System.*out*.println(receiver + " >>
COOL!!! I've received a card from:["+arg0.getStringProperty("JMSXGroupID")+
"]."+System.*getProperty*("line.separator")+"       Card message is "
+((TextMessage)arg0).getText());

                                  } *catch* (JMSException e) {

                                        // *TODO* Auto-generated catch block

                                        e.printStackTrace();

                                  }

                           }



                    });



                    *this*.connection.start();

                    System.*out*.println(receiver + " starts waiting for new
Cards. Selector is ["+consumer.getMessageSelector()+"]");

                    // Clean up

             } *catch* (Exception e) {

                    System.*out*.println("Caught: " + e);

                    e.printStackTrace();

             }



       }

       *private* *void* init() {

             // Create a ConnectionFactory

             ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(

                           "tcp://dmc17525:61616");



             // Create a Connection

             *try* {

                    *this*.connection = connectionFactory.createConnection(*
this*.where, *this*.where);

                    connection.setClientID("Waiting on -" + *this*.where + "
"+ System.*currentTimeMillis*());

                    // Create a Session

                    session = connection.createSession(*false*,

                                  Session.*AUTO_ACKNOWLEDGE*);



             } *catch* (JMSException e) {

                    e.printStackTrace();

             }



}



       *protected* *void* finalize() *throws* Throwable {

             System.*out*.println("Finalizing [" + *this* +"]");

             *if*(*this*.session!=*null*)session.close();

             *if*(*this*.connection!=*null*)         *this*.connection
.close();

             *super*.finalize();

       }

       *public* String toString(){

             *return* "Card receiver :: " + *this*.where;

       }

}

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message