qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anthony Foglia <afog...@princeton.com>
Subject Queues that exist only when their are receivers
Date Thu, 11 Aug 2011 14:18:09 GMT
We're trying to create a topology where we have an exchange with a
queue bound to it, such that while there are receivers attached to the
queue, the queue exists, but when there are no receivers the queue
goes away.  (The exchange has an alternate exchange which sends the
message across a federated queue to another broker.)

We are creating the receiver with an address "binding_bug_queue; {
create: always, node: { type: queue, durable: False, x-declare: {
exclusive: False, auto-delete: True } }, link: { durable: False,
x-bindings: [ { exchange : 'binding_bug_exchange', key: 'init' } ] }
}".

With the C++ library (trunk version), if there are two receivers in
different processes, then when either closes, the binding is removed.

But with the Python library (RHEL package 0.7.946106-15.el6), when
either of two processes close, the binding stays.

Also, since we are using the SWIG bindings, I tried those, and
discovered if I do not properly close the receiver, the session, and
the connection objects, the binding survives the ending of one of the
receiving processes, but not both.  (i.e. It works as desired.)

Which library is doing the correct thing?  Is our address correct?

Below I have pasted my two test programs.


-- 
Anthony Foglia
Princeton Consultants
(609) 987-8787 x233



----- binding_test.cpp -----
#include <iostream>
#include <string>
#include <cstdlib>

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>


void print_first_binding(const std::string & exchange) {
  std::string cmd = "qpid-config -b exchanges | grep -A 1 "+exchange;
  std::system(cmd.c_str());
}


int main() {
  using std::string;
  using std::cout;
  using std::endl;
  using qpid::messaging::Connection;
  using qpid::messaging::Session;
  using qpid::messaging::Receiver;

  string queue = "binding_bug_queue";
  string exchange = "binding_bug_exchange";
  string key = "init";

  print_first_binding(exchange);

  cout << "Opening connection" << endl;
  Connection connection("anthony/princeton@localhost");
  connection.open();

  Session session = connection.createSession();

  string address = queue +"; { create: always, node: { type: queue,
durable: False, x-declare: { exclusive: False, auto-delete: True } },
link: { durable: False, x-bindings: [ { exchange : '"+exchange+"',
key: '"+key+"' } ] } }";

  print_first_binding(exchange);

  cout << "Creating receiver to address: " << address << endl;
  Receiver receiver = session.createReceiver(address);

  print_first_binding(exchange);

  cout << "Press any key to quit: ";
  char c;
  std::cin >> c;

  cout << "Getting bindings one last time" << endl;
  print_first_binding(exchange);

  cout << "Closing" << endl;

  receiver.close();
  session.close();
  connection.close();

  print_first_binding(exchange);
}

----- binding_test.py -----
#!/usr/bin/env python

import os
import time
import optparse
import subprocess

if os.environ.get("QPID_API", "swig").lower() == "swig" :
  try :
    import cqpid as messaging
  except ImportError :
    print "Error using swig.  Falling back to python api"
    import qpid.messaging as messaging
  else :
    print "Using swig api"
else :
  print "Using python API"
  import qpid.messaging as messaging



def print_bindings(exchange_name) :
  proc = subprocess.Popen(["qpid-config", "-b", "exchanges"],
                          stdout=subprocess.PIPE)
  proc.wait()
  on_exchange = False
  for lyne in proc.stdout :
    if lyne.startswith("Exchange ") :
      on_exchange = ("'%s'" % exchange_name) in lyne
    if on_exchange :
      print lyne.rstrip()


if __name__=="__main__" :
  parser = optparse.OptionParser("%prog [OPTIONS]")
  parser.add_option("-b", "--broker")
  parser.add_option("--close", type="choice", choices=("clean", "dirty"),
                    default="clean",
                    help="Clean closing of qpid objects [default: %default]")
  options, args = parser.parse_args()

  queue_name = "binding_bug_queue"
  exchange_name = "binding_bug_exchange"
  key = "init"

  print_bindings(exchange_name)

  print "Connecting"
  connection = messaging.Connection.establish(options.broker)

  session = connection.session()

  address = "{queue}; {{ create: always, node: {{ type: queue,
durable: False, x-declare: {{ exclusive: False, auto-delete: True }}
}}, link: {{ durable: False, x-bindings: [ {{ exchange: '{exchange}',
key: '{key}' }} ] }} }}".format(queue=queue_name,
exchange=exchange_name, key=key)

  print "Creating receiver to address", address
  receiver = session.receiver(address)

  print_bindings(exchange_name)

  try :
    while True :
      time.sleep(1)
  except KeyboardInterrupt :
    print "Exiting"

  if options.close == "clean" :
    print "Closing receiver"
    receiver.close()
    print_bindings(exchange_name)

    print "Closing session"
    session.close()
    print_bindings(exchange_name)

    print "Closing connection"
    connection.close()
    print_bindings(exchange_name)

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Mime
View raw message