activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ChicagoBob123 <b...@bobfx.com>
Subject Newbie questions on NMS or CMS ActiveMQ consumers puzzle
Date Wed, 01 Sep 2010 14:51:41 GMT

The problem I have 2 consumers of one queue and each consumer only seems to
get 1/2 the messages. 

Here is some my consumer code. 
The concept is easy I browse the queue and remove any messages that belonged
to my application
based on an id contained in the message. Then I acknowledge which should
dequeue it (but it did not)

Later I tried the ConsumerReceiveThread() which just does a Receive which
dequeues the items but only gets 1/2 the items sent.. 

In my initialization  I started comsuming the queue with using
AcknowledgementMode.IndividualAcknowledge session flag which I had hoped
would allow me 
to remove only the items I was interested in. 
Why would I get only 1/2 the items? Also would I only get 1/3 the items if I
had 3 consumers? (I guess I should just try that) 


HELP ideas? thanks


//////////// Create the queue
 Uri connecturi = new Uri(ActiveMQURL.Text);
   Apache.NMS.ActiveMQ.ConnectionFactory connect = new
Apache.NMS.ActiveMQ.ConnectionFactory(connecturi);
   connection = connect.CreateConnection();
   connection.Start();
   session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
   destination = SessionUtil.GetDestination(session, QueueName.Text);

   // Create a consumer    
  consumer = session.CreateConsumer(destination);



///////// Consumer thread. 
  public void ConsumerBrowserThread()
   {
    String rec;

    String idtext = "id=\"";
    idtext = idtext + f.ID.Text + "\"";
    StayAlive = true;
    while (StayAlive)
     {
      IQueue q = SessionUtil.GetQueue(f.session, f.QueueName.Text);
      IQueueBrowser qb = f.session.CreateBrowser((IQueue)f.destination);
      System.Collections.IEnumerator me = qb.GetEnumerator();
      while (me.MoveNext())
      {
       IMessage msg = (IMessage)me.Current;
       if(msg == null)
        continue;
       ITextMessage message = (ITextMessage)msg;
       rec = message.Text;
       if (rec.Contains(idtext))
        {
         
         f.MessageRecieved.Text += rec;
         msg.Acknowledge();
        }  
      }
      qb.Close();           
     }
    Thread.Sleep(1000);
   }



public void ConsumerReceiveThread()
  {
   String rec;

   String idtext = "id=\"";
   idtext = idtext + f.ID.Text + "\"";
   StayAlive = true;
   while (StayAlive)
   {
    System.TimeSpan ts = new System.TimeSpan(0,0,1);
    IMessage msg = f.consumer.Receive(ts);
    if(msg != null)
     {
      ITextMessage message = (ITextMessage)msg;
      rec = message.Text;
      if (rec.Contains(idtext))
       {
         f.MessageRecieved.Text += rec;
        msg.Acknowledge();
       }
     }
    Thread.Sleep(1000);
   }
  }
-- 
View this message in context: http://activemq.2283324.n4.nabble.com/Newbie-questions-on-NMS-or-CMS-ActiveMQ-consumers-puzzle-tp2403331p2403331.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message