activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From etiennet123 <etie...@lambdasales.com>
Subject Problem Consuming from queue with more than one consumer
Date Wed, 20 Jul 2016 18:25:10 GMT
Hi 

I am trying to create a app to consume messages in a round robin fashion.
However only one will work at a time.

Preferably I would like to create two seperate apps to consume from the same
queue.

Here is the code I am trying to use:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading;
using Apache.NMS;
using Apache.NMS.Util;

namespace Consume2Sessions
{
    public partial class Consume2Sessions : Form
    {
        Int32 ConsumeCount1 = 0;
        Int32 ConsumeCount2 = 0;

        Thread ConsumerThread1;
        Thread ConsumerThread2;

        Boolean StopThread1 = false;
        Boolean StopThread2 = false;

        Uri connecturi = null;
        IConnectionFactory factory = null;
        IConnection connection;

        public Consume2Sessions()
        {
            InitializeComponent();

            connecturi = new Uri("activemq:tcp://localhost:61616");
            //            Uri connecturi = new
Uri("activemq:tcp://localhost:61617");


            factory = new NMSConnectionFactory(connecturi);

            connection = factory.CreateConnection();

             connection.Start();
            
        }

        private void btnStartConsumer1_Click(object sender, EventArgs e)
        {
            ConsumerThread1 = new Thread(ConsumeThread1);
            ConsumerThread1.Start();
        }

        private void btnStopConsumer1_Click(object sender, EventArgs e)
        {
            StopThread1 = true;
        }

        private void btnStartConsumer2_Click(object sender, EventArgs e)
        {
            ConsumerThread2 = new Thread(ConsumeThread2);
            ConsumerThread2.Start();
        }

        private void btnStopConsumer2_Click(object sender, EventArgs e)
        {
            StopThread2 = true;
        }

        void ConsumeThread1()
        {


            //using (IConnection connection = factory.CreateConnection())
            //{
                

                using (ISession session = connection.CreateSession())
                {

                    IDestination destination =
SessionUtil.GetDestination(session,
"queue://BIG.MONKEY?consumer.prefetchSize=1");

                    using (IMessageConsumer consumer =
session.CreateConsumer(destination))
                    {
                        
                        // Start the connection so that messages will be
processed.
                   //     connection.Start();

                        while (!StopThread1)
                        {
                            try
                            {

                                ITextMessage message = consumer.Receive() as
ITextMessage;
                                // ProcessedCount++; 
                                ConsumeCount1++;
                                // Thread.Sleep(1000);
                            }
                            catch (Exception ss)
                            { }
                        }

                    }


                }
            //}
            StopThread1 = false;
        }

        void ConsumeThread2()
        {

           // using (IConnection connection = factory.CreateConnection())
            using (ISession session = connection.CreateSession())
            {

                IDestination destination =
SessionUtil.GetDestination(session,
"queue://BIG.MONKEY?consumer.prefetchSize=1");

                using (IMessageConsumer consumer =
session.CreateConsumer(destination))
                {
                    // Start the connection so that messages will be
processed.
                    connection.Start();

                    while (!StopThread2)
                    {
                        try
                        {

                            ITextMessage message = consumer.Receive() as
ITextMessage;
                            // ProcessedCount++; 
                            ConsumeCount2++;
                            // Thread.Sleep(1000);
                        }
                        catch (Exception dd)
                        {


                        }
                    }

                }
            }





            StopThread2 = false;
        }

        private void timer1_Tick(object sender, EventArgs e)
        {
            lblStatus1.Text = ConsumeCount1.ToString();
            lblStatus2.Text = ConsumeCount2.ToString();
        }
        
    }
}




--
View this message in context: http://activemq.2283324.n4.nabble.com/Problem-Consuming-from-queue-with-more-than-one-consumer-tp4714196.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message