qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nithesh Shetty <knitheshshe...@yahoo.com>
Subject Re: Async get
Date Sat, 18 Jul 2009 04:14:35 GMT
 
HI,
 
I have added wait before close in the below mentioned code.
 
    subscriptions.stop();
    subscriptions.wait();
    session.close();

But in windows it gives the same error 
access violation reading location 0x00000
with below mentioned call stack.
 
on linux it works fine.
 
Awaiting your reply.
 
regards,
Nithesh
 


--- On Fri, 7/17/09, Nithesh Shetty <knitheshshetty@yahoo.com> wrote:


From: Nithesh Shetty <knitheshshetty@yahoo.com>
Subject: Async get
To: users@qpid.apache.org
Date: Friday, July 17, 2009, 10:08 AM


Hi
 
I have created three thread two threads read asynchronoulsy from the queue and one thread
writes to the queue..
 
below is the code.
 
class listener : public MesageListener
{
 // constructor
// received function;
};
 
unsigned int __stdcall ReadThreadProc(void *)
{
      const char* host = "172.20.112.111"
      int port = 5004;
      try
     {
           Connection connection;
           ConnectionSettings lcConnSettings;
           lcConnSettings.host = host;
           lcConnSettings.port = port;
           lcConnSettings.maxFrameSize = 2048;
           lcConnSettings.username = "guest";
           lcConnSettings.password = "guest";
           connection.open(lcConnSettings);
           Session session = connection.newSession();
 
          QueueOrderingPolicy cQOP = FIFO;
          QueueOptions lcQueueOptions;
          lcQueueOptions.setOrdering(cQOP);
          session.exchangeDeclare(arg::exchange = "QM", arg::exclusive = false; arg:autoDelete
= false; arg::type = "direct", );
          session.queueDeclare(arg::queue = "RMS", arg::exclusive = false, arg::qutoDelete
= false, arg::arguments = lcQueueOptions);
         session.exchangeBind(arg::exchange = "QM", arg::queue = "RMS", arg::bindingKey
= "QM_ROUTE_KEY");
 
     SubsriptionManager subscriptions(session);
     Listener listener(subscriptions);
    subscription.subscribe(listener, "RMS");
    subscription.start();
 
    Sleep(10000);
    subscriptions.stop();
    session.close();
    connection.close();
 
     }
    catch(const exception &error )
  {
    cout << error.wha() << endl;
 }
   return 0;
  
}
 
 
unsigned int __stdcall WriteThreadProc(void *)
{
      const char* host = "172.20.112.111"
      int port = 5004;
      try
     {
           Connection connection;
           ConnectionSettings lcConnSettings;
           lcConnSettings.host = host;
           lcConnSettings.port = port;
           lcConnSettings.maxFrameSize = 2048;
           lcConnSettings.username = "guest";
           lcConnSettings.password = "guest";
           connection.open(lcConnSettings);
           Session session = connection.newSession();
 
          QueueOrderingPolicy cQOP = FIFO;
          QueueOptions lcQueueOptions;
          lcQueueOptions.setOrdering(cQOP);
          session.exchangeDeclare(arg::exchange = "QM", arg::exclusive = false; arg:autoDelete
= false; arg::type = "direct", );
          session.queueDeclare(arg::queue = "RMS", arg::exclusive = false, arg::qutoDelete
= false, arg::arguments = lcQueueOptions);
         session.exchangeBind(arg::exchange = "QM", arg::queue = "RMS", arg::bindingKey
= "QM_ROUTE_KEY");
  
    string buffer("Hello from 0676 ");
   string lastdata(256, Q);
    string target("QM");
    string routeKey("QM_ROUTE_KEY");
    Message message'
    message.getDeliveryProperties().setRouteKey(routeKey);
    for(int i = 0; i < 100; ++i)
{
    message.setData(buffer);
    async(session).messageTransfer(arg::content=message, arg::destination=target, arg::acceptMode=message::ACCEPT_MODE_NONE, 
arg::acquireMode=message::
 ACQUIRE_MODE_PRE_ACQUIRED, arg::sync = false);
 
}
    message.setData(lastdata);
    async(session).messageTransfer(arg::content=message, arg::destination=target, arg::acceptMode=message::ACCEPT_MODE_NONE, 
arg::acquireMode=message::
 ACQUIRE_MODE_PRE_ACQUIRED, arg::sync = false);  
 
session.sync();
 session.close();
 connection.close()
 
}
catch(const exception & error)
{
   const << error.what();
}
 
return 0;
}
 
int main()
{
 thread1(readThreadproc)
thread2(writethreadproc);
thread3(readThreadProc);
}
 
Now the problem  is it give exception if u check the call stack then 
it show ptr = 0x00000000 line 664
qpid::sys::SystemInfo::getProcessName() 
qpid::client::ConnectionHandler::ConnectionHandle()
qpid::client::ConnectionImpl::ConnectionImp()
qpid::client:Connection::open() ( open connection is from read thread)
 
tested the above code on windows
 
Awaiting your reply.
 
Regards,
Nithesh
 
 
 
 
 
 
 
 
 
 
 
 
 
 


      


      
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message