qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fcolle <fco...@tiscali.it>
Subject c# topic publisher and listener sessions dropped
Date Thu, 14 Apr 2011 10:02:27 GMT
Hello!
 I'm using c# client to connect to a qpid c++ broker for publishing and
receiving messages on a topic (amq.topic).
Unfortunately, both publisher and listener sessions/connections are dropped
after 3-5 minutes. No exceptions or events are generated by the c# client,
so I wonder how to handle this situation: I want the listener to be
connected forever to its topic. I've tried to change the connection timeout,
but with no success.
 
I created a simple publisher and a listener for reproducing the problem. The
first c# console application publishes the lines read from the console,
while the second application reads from the topic.

After a few minutes the two parts are not connected anymore and using the
'list queue' command in the qpid-tool utility I see that the queue has been
destroyed.

Any help is appreciated,
bye Fcolle


/*********************************************************************************/
//-----------------------------------------------
// PUBLISHER CODE
//-----------------------------------------------

using System;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;

namespace Publisher
{

   /// 
   /// This program is one of two programs designed to be used
   /// together. These programs use the topic exchange.
   ///    
   /// Publisher:
   ///
   /// Publishes to a broker, specifying a routing key.
   ///
   /// Listener (this program):
   ///
   /// Reads from a queue on the broker using a message listener.
   ///
   /// 
   class Listener
   {
       public static int _count = 1;

       private static void Main(string[] args)
       {
           string host = PublisherSettings.Default.Host;
           int port = PublisherSettings.Default.Port;
           string virtualhost = PublisherSettings.Default.VirtualHost;
           string username = PublisherSettings.Default.Username;
           string password = PublisherSettings.Default.Password;

           Client connection = new Client();
           try
           {
               connection.Connect(host, port, virtualhost, username,
password);
               IClientSession session = connection.CreateSession(50000);

               publishMessages(session, "myqueue.1");

               noMoreMessages(session);

               
               connection.Close();
           }
           catch (Exception e)
           {
               Console.WriteLine("Error: \n" + e.StackTrace);
           }
       }

       private static void publishMessages(IClientSession session, string
routing_key)
       {
           
           string read = "";
           do
           {
               IMessage message = new Message();
               read = Console.ReadLine();
               message.ClearData();
               message.AppendData(Encoding.UTF8.GetBytes(read));
               session.MessageTransfer("amq.topic", routing_key, message);            
           } while (read != "End");
       }

       private static void noMoreMessages(IClientSession session)
       {
           IMessage message = new Message();
           // And send a syncrhonous final message to indicate termination.
           message.ClearData();
           message.AppendData(Encoding.UTF8.GetBytes("End"));
           session.MessageTransfer("amq.topic", "control", message);
           session.Sync();
       }
   }
}



/*********************************************************************************/
//-----------------------------------------------
// LISTENER CODE
//-----------------------------------------------
using System;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;

namespace Subscriber
{

   /// 
   /// This program is one of two programs designed to be used
   /// together. These programs use the topic exchange.
   ///    
   /// Publisher:
   ///
   /// Publishes to a broker, specifying a routing key.
   ///
   /// Listener (this program):
   ///
   /// Reads from a queue on the broker using a message listener.
   ///
   /// 
   class Listener
   {
       public static int _count = 1;

       private static void Main(string[] args)
       {
           string host = Settings1.Default.Host;
           int port = Settings1.Default.Port;
           string virtualhost = Settings1.Default.VirtualHost;
           string username = Settings1.Default.Username;
           string password = Settings1.Default.Password;

           Client connection = new Client();
           try
           {
               connection.Connect(host, port, virtualhost, username,
password);
               IClientSession session = connection.CreateSession(50000);

               //--------- Main body of program
--------------------------------------------

               lock (session)
               {
                   Console.WriteLine("Listening for messages ...");
                   // Create a listener                    
                   prepareQueue("myqueue", "myqueue.#", session);
                   while (_count > 0)
                   {
                       Monitor.Wait(session);
                   }
               }

              
//---------------------------------------------------------------------------

               connection.Close();
           }
           catch (Exception e)
           {
               Console.WriteLine("Error: \n" + e.StackTrace);
           }
       }

       private static void prepareQueue(string queue, string routing_key,
IClientSession session)
       {
           // Create a unique queue name for this consumer by concatenating
           // the queue name parameter with the Session ID.     
           Console.WriteLine("Declaring queue: " + queue);
           session.QueueDeclare(queue, Option.EXCLUSIVE,
Option.AUTO_DELETE);

           // Route messages to the new queue if they match the routing key.
           // Also route any messages to with the "control" routing key to
           // this queue so we know when it's time to stop. A publisher
sends
           // a message with the content "That's all, Folks!", using the
           // "control" routing key, when it is finished.

           session.ExchangeBind(queue, "amq.topic", routing_key);
           session.ExchangeBind(queue, "amq.topic", "control");

           // subscribe the listener to the queue
           IMessageListener listener = new MessageListener(session);
           session.AttachMessageListener(listener, queue);
           session.MessageSubscribe(queue);
       }
   }

   public class MessageListener : IMessageListener
   {
       private readonly IClientSession _session;
       private readonly RangeSet _range = new RangeSet();

       public MessageListener(IClientSession session)
       {
           _session = session;
       }

       public void MessageTransfer(IMessage m)
       {
           BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
           byte[] body = new byte[m.Body.Length - m.Body.Position];
           reader.Read(body, 0, body.Length);
           ASCIIEncoding enc = new ASCIIEncoding();
           string message = enc.GetString(body);
           Console.WriteLine(DateTime.Now.ToString()+"\t-\tMessage:\t[" +
message + "]\tfrom: " + m.Destination);
           // Add this message to the list of message to be acknowledged
           _range.Add(m.Id);
           if (message.Equals("End"))
           {
               Console.WriteLine("Shutting down listener for " +
m.DeliveryProperties.GetRoutingKey());
               Listener._count--;
               // Acknowledge all the received messages
               _session.MessageAccept(_range);
               lock (_session)
               {
                   Monitor.Pulse(_session);
               }
           }
       }
   }
}








--
View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/c-topic-publisher-and-listener-sessions-dropped-tp6272182p6272182.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Mime
View raw message