qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Steve Huston" <shus...@riverace.com>
Subject RE: c# topic publisher and listener sessions dropped
Date Thu, 14 Apr 2011 11:49:57 GMT
There was some recent discussion very similar to this on the group. Are
you by any chance using the pre-0.10 .NET client? If so, I recommend you
stop, wait for Qpid 0.10 (which should be released shortly) and use the
new .NET bindings.

-Steve

--
Steve Huston, Riverace Corporation
Total Lifecycle Support for Your Networked Applications
http://www.riverace.com


> -----Original Message-----
> From: fcolle [mailto:fcolle@tiscali.it] 
> Sent: Thursday, April 14, 2011 6:02 AM
> To: users@qpid.apache.org
> Subject: c# topic publisher and listener sessions dropped
> 
> 
> Hello!
>  I'm using c# client to connect to a qpid c++ broker for 
> publishing and receiving messages on a topic (amq.topic). 
> Unfortunately, both publisher and listener 
> sessions/connections are dropped after 3-5 minutes. No 
> exceptions or events are generated by the c# client, so I 
> wonder how to handle this situation: I want the listener to 
> be connected forever to its topic. I've tried to change the 
> connection timeout, but with no success.
>  
> I created a simple publisher and a listener for reproducing 
> the problem. The first c# console application publishes the 
> lines read from the console, while the second application 
> reads from the topic.
> 
> After a few minutes the two parts are not connected anymore 
> and using the 'list queue' command in the qpid-tool utility I 
> see that the queue has been destroyed.
> 
> Any help is appreciated,
> bye Fcolle
> 
> 
> /*************************************************************
> ********************/
> //-----------------------------------------------
> // PUBLISHER CODE
> //-----------------------------------------------
> 
> using System;
> using System.Configuration;
> using System.IO;
> using System.Text;
> using System.Threading;
> using org.apache.qpid.client;
> using org.apache.qpid.transport;
> 
> namespace Publisher
> {
> 
>    /// 
>    /// This program is one of two programs designed to be used
>    /// together. These programs use the topic exchange.
>    ///    
>    /// Publisher:
>    ///
>    /// Publishes to a broker, specifying a routing key.
>    ///
>    /// Listener (this program):
>    ///
>    /// Reads from a queue on the broker using a message listener.
>    ///
>    /// 
>    class Listener
>    {
>        public static int _count = 1;
> 
>        private static void Main(string[] args)
>        {
>            string host = PublisherSettings.Default.Host;
>            int port = PublisherSettings.Default.Port;
>            string virtualhost = PublisherSettings.Default.VirtualHost;
>            string username = PublisherSettings.Default.Username;
>            string password = PublisherSettings.Default.Password;
> 
>            Client connection = new Client();
>            try
>            {
>                connection.Connect(host, port, virtualhost, 
> username, password);
>                IClientSession session = 
> connection.CreateSession(50000);
> 
>                publishMessages(session, "myqueue.1");
> 
>                noMoreMessages(session);
> 
>                
>                connection.Close();
>            }
>            catch (Exception e)
>            {
>                Console.WriteLine("Error: \n" + e.StackTrace);
>            }
>        }
> 
>        private static void publishMessages(IClientSession 
> session, string
> routing_key)
>        {
>            
>            string read = "";
>            do
>            {
>                IMessage message = new Message();
>                read = Console.ReadLine();
>                message.ClearData();
>                message.AppendData(Encoding.UTF8.GetBytes(read));
>                session.MessageTransfer("amq.topic", 
> routing_key, message);            
>            } while (read != "End");
>        }
> 
>        private static void noMoreMessages(IClientSession session)
>        {
>            IMessage message = new Message();
>            // And send a syncrhonous final message to 
> indicate termination.
>            message.ClearData();
>            message.AppendData(Encoding.UTF8.GetBytes("End"));
>            session.MessageTransfer("amq.topic", "control", message);
>            session.Sync();
>        }
>    }
> }
> 
> 
> 
> /*************************************************************
> ********************/
> //-----------------------------------------------
> // LISTENER CODE
> //-----------------------------------------------
> using System;
> using System.Configuration;
> using System.IO;
> using System.Text;
> using System.Threading;
> using org.apache.qpid.client;
> using org.apache.qpid.transport;
> 
> namespace Subscriber
> {
> 
>    /// 
>    /// This program is one of two programs designed to be used
>    /// together. These programs use the topic exchange.
>    ///    
>    /// Publisher:
>    ///
>    /// Publishes to a broker, specifying a routing key.
>    ///
>    /// Listener (this program):
>    ///
>    /// Reads from a queue on the broker using a message listener.
>    ///
>    /// 
>    class Listener
>    {
>        public static int _count = 1;
> 
>        private static void Main(string[] args)
>        {
>            string host = Settings1.Default.Host;
>            int port = Settings1.Default.Port;
>            string virtualhost = Settings1.Default.VirtualHost;
>            string username = Settings1.Default.Username;
>            string password = Settings1.Default.Password;
> 
>            Client connection = new Client();
>            try
>            {
>                connection.Connect(host, port, virtualhost, 
> username, password);
>                IClientSession session = 
> connection.CreateSession(50000);
> 
>                //--------- Main body of program
> --------------------------------------------
> 
>                lock (session)
>                {
>                    Console.WriteLine("Listening for messages ...");
>                    // Create a listener                    
>                    prepareQueue("myqueue", "myqueue.#", session);
>                    while (_count > 0)
>                    {
>                        Monitor.Wait(session);
>                    }
>                }
> 
>               
> //------------------------------------------------------------
> ---------------
> 
>                connection.Close();
>            }
>            catch (Exception e)
>            {
>                Console.WriteLine("Error: \n" + e.StackTrace);
>            }
>        }
> 
>        private static void prepareQueue(string queue, string 
> routing_key, IClientSession session)
>        {
>            // Create a unique queue name for this consumer by 
> concatenating
>            // the queue name parameter with the Session ID.     
>            Console.WriteLine("Declaring queue: " + queue);
>            session.QueueDeclare(queue, Option.EXCLUSIVE, 
> Option.AUTO_DELETE);
> 
>            // Route messages to the new queue if they match 
> the routing key.
>            // Also route any messages to with the "control" 
> routing key to
>            // this queue so we know when it's time to stop. A 
> publisher sends
>            // a message with the content "That's all, 
> Folks!", using the
>            // "control" routing key, when it is finished.
> 
>            session.ExchangeBind(queue, "amq.topic", routing_key);
>            session.ExchangeBind(queue, "amq.topic", "control");
> 
>            // subscribe the listener to the queue
>            IMessageListener listener = new MessageListener(session);
>            session.AttachMessageListener(listener, queue);
>            session.MessageSubscribe(queue);
>        }
>    }
> 
>    public class MessageListener : IMessageListener
>    {
>        private readonly IClientSession _session;
>        private readonly RangeSet _range = new RangeSet();
> 
>        public MessageListener(IClientSession session)
>        {
>            _session = session;
>        }
> 
>        public void MessageTransfer(IMessage m)
>        {
>            BinaryReader reader = new BinaryReader(m.Body, 
> Encoding.UTF8);
>            byte[] body = new byte[m.Body.Length - m.Body.Position];
>            reader.Read(body, 0, body.Length);
>            ASCIIEncoding enc = new ASCIIEncoding();
>            string message = enc.GetString(body);
>            
> Console.WriteLine(DateTime.Now.ToString()+"\t-\tMessage:\t[" 
> + message + "]\tfrom: " + m.Destination);
>            // Add this message to the list of message to be 
> acknowledged
>            _range.Add(m.Id);
>            if (message.Equals("End"))
>            {
>                Console.WriteLine("Shutting down listener for 
> " + m.DeliveryProperties.GetRoutingKey());
>                Listener._count--;
>                // Acknowledge all the received messages
>                _session.MessageAccept(_range);
>                lock (_session)
>                {
>                    Monitor.Pulse(_session);
>                }
>            }
>        }
>    }
> }
> 
> 
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-qpid-users.2158936.n2.nabble.com/c-topic-publish
> er-and-listener-sessions-dropped-tp6272182p6272182.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:users-subscribe@qpid.apache.org
> 
> 


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Mime
View raw message