qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ignacio Ybarra <ignacio_yba...@hotmail.com>
Subject RE: Last Value Queue LVQ using dotnet API / last value cache
Date Tue, 27 Oct 2009 17:27:07 GMT

The example code works fine but it is on C++. I need the .net version to do the same and found
the problem to be in the ClientSession.cs:

 

public void messageSubscribe(String queue)

{

messageSubscribe(queue, queue, MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
null, 0, null);

// issue credits 

messageSetFlowMode(queue, MessageFlowMode.WINDOW);

messageFlow(queue, MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);

messageFlow(queue, MessageCreditUnit.MESSAGE, 10000); 

}

 

Created a new signature and used MessageAcquireMode.NOT_ACQUIRED instead - it works now. 

 

One other thing I noticed is the MessageCreditUnit.MESSAGE set to 10000 - reason why my .net
client prototype was not receiving more than 10K messages per session... maybe this value
should be configurable...?

 

Ig


 
> Date: Tue, 27 Oct 2009 10:45:19 -0400
> From: cctrieloff@redhat.com
> To: users@qpid.apache.org
> Subject: Re: Last Value Queue LVQ using dotnet API / last value cache
> 
> 
> I had posted a code example here on LVQ http://qpid.apache.org/lvq.html 
> a while ago which I believe covers your question also
> 
> Carl.
> 
> 
> 
> Ignacio Ybarra wrote:
> > Ok - third time. Let's hope the email content works on "rich text"...
> >
> > 
> >
> > Tried the following with no success:
> >
> > 
> >
> > session.attachMessageListener(this, "myDest");
> > session.messageSubscribe("q1", "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.NOT_ACQUIRED,
null, 0, null);
> >
> > 
> >
> > Am I missing something like this?
> >
> > 
> >
> > //session.messageSetFlowMode("myDest",MessageFlowMode.WINDOW);
> > //session.messageFlow("myDest", MessageCreditUnit.BYTE,ClientSession.MESSAGE_FLOW_MAX_BYTES);
> > //session.sync();
> >
> >
> > The following works, but I can't set MessageAcquireMode.NOT_ACQUIRED using these
methods:
> >
> > 
> >
> > session.attachMessageListener(this,"q1");
> > session.messageSubscribe("q1");
> >
> > 
> >
> > (Both snippets above are followed by: while (true) { Console.WriteLine("sleeping");Thread.Sleep(1000);}
)
> >
> > 
> > 
> >> From: jbird@microsoft.com
> >> To: users@qpid.apache.org
> >> Subject: RE: Last Value Queue LVQ using dotnet API / last value cache
> >> Date: Tue, 27 Oct 2009 01:25:31 +0000
> >>
> >> I just had to figure this out myself. MessageAcquireMode.PRE_ACQUIRED is exactly
what you DON'T want, because acquiring a message removes it from the queue. NOT_ACQUIRED will
leave it in the queue.
> >>
> >> -----Original Message-----
> >> From: Ignacio Ybarra [mailto:ignacio_ybarra@hotmail.com] 
> >> Sent: Monday, October 26, 2009 5:44 PM
> >> To: users@qpid.apache.org
> >> Subject: Last Value Queue LVQ using dotnet API / last value cache
> >>
> >>
> >> Hello, 
> >>
> >> I have been trying to set up a Last Value Queue using your dotnet API (client0-10).
Most of it works great -- apart from consuming messages without taking them from the queue
(peek).
> >>
> >> Basically I want the consumer/listener to receive by not remove the msg from
the lvq, i.e. the setup to work as a last value cache.
> >>
> >> Since I couldn't find parameters/methods to set this up using the interface,
I have taken the steps below.
> >>
> >> The setup part of my code looks like this:
> >>
> >> session.exchangeDeclare("ig.direct", "direct", null, null, new Option[] { Option.DURABLE
});
> >> session.queueDeclare("q1",null,new Dictionary<string,object>(){{"qpid.last_value_queue",1}},
new Option[] { Option.DURABLE });
> >> session.exchangeBind("q1", "ig.direct", "routing_key");
> >>
> >> The sender/publisher part of the code looks like this (in reality I send a batch
of msgs so they get overwritten):
> >> //List<IMessage> messages = new List<IMessage>() ;
> >> //for (int i = 0; i < Program.NUM_MSGS; i++)
> >> //{
> >> IMessage message = new Message();
> >> message.DeliveryProperties.setRoutingKey("routing_key");
> >> message.DeliveryProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
> >> message.appendData(Encoding.UTF8.GetBytes("Message "));// + i));
> >> message.ApplicationHeaders.Add("qpid.LVQ_key", "INTC");
> >> //messages.Add(message)
> >> //}
> >> session.messageTransfer("ig.direct", message);//s[i]);
> >>
> >>
> >> The receiver/listener looks like this:
> >>
> >> session.attachMessageListener(this, "q1");
> >> session.messageSubscribe("q1");
> >> //public void messageTransfer(IMessage m)
> >> //{
> >> //...
> >> // Console.WriteLine("Message: " + message);
> >> //}
> >>
> >> All works with this setup, but the message is removed from the queue.
> >>
> >> I have also tried the following in the listener, but in this case the callback
does not get any messages:
> >>
> >> session.attachMessageListener(this, "myDest");
> >> session.messageSubscribe("q1", "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
null, 0, null);
> >>
> >> Any clues? Many thanks in advance.- ig
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project: http://qpid.apache.org
> >> Use/Interact: mailto:users-subscribe@qpid.apache.org
> >>
> >>
> >>
> >> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project: http://qpid.apache.org
> >> Use/Interact: mailto:users-subscribe@qpid.apache.org
> >>
> >> 
> > 
> > 
> 
 		 	   		  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message