activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matevz Tadel <matevz.ta...@cern.ch>
Subject cms::ExceptionListener::onException() and what one can do there
Date Fri, 08 Mar 2013 02:16:23 GMT
Hi,

I'm using a single connection to an AMQ topic in a multi-threaded program that does many other
things beyond sending data over AMQ. So, when an AMQ onException() happens I really do not
want to exit the application (all demos seem to have "// exit(1);" in this handler :) ) and
would like to reinitialize my single connection even if this means leaking some resources.

I'm getting exceptions like this since switching to 3.5.0 (but the load on application is
getting higher so it doesn't have to be a change introduced in 3.5.0):
2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception callback invoked:
    DataInputStream::readLong - Reached EOF
and the application now crashes when I close the connection (before this used to trigger throwing
of a true exception in my application thread that does AMQ calls).

Please see my connect and onException() implementations below. Full class is available here:
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup

I guess the question is ... is there anything sane one can do after onException() is invoked
or this actually means internal state of amq-cpp is corrupted beyond repair? If this is indeed
the case, the only option  for me is to separate sending of AMQ messages from the main application.

Best,
Matevz


void XrdFileCloseReporterAmq::amq_connect()
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");

  TString uri;

  // 2012-07 Failover and Stomp don't splice ... or, they splice too well, making as
  // many threads as ulimit lets them on first error ... thrashing the machine.
  // uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", mAmqHost.Data(), mAmqPort);

  uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);

  try
  {
    auto_ptr<cms::ConnectionFactory> conn_fac
      (new activemq::core::ActiveMQConnectionFactory(uri.Data(), mAmqUser.Data(), mAmqPswd.Data()));

    mConn = conn_fac->createConnection();
    mConn->setExceptionListener(this);
    mConn->start();
  }
  catch (cms::CMSException& e)
  {
    throw _eh + "Exception during connection creation: " + e.getStackTraceString();
  }

  try
  {
    mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
    mDest = mSess->createTopic(mAmqTopic.Data());
    mProd = mSess->createProducer(mDest);
    mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // Copied from examples,
NFI.
  }
  catch (cms::InvalidDestinationException& e)
  {
    throw _eh + "Invalid destination exception during producer creation: " + e.getStackTraceString();
  }
  catch (cms::CMSException& e)
  {
    throw _eh + "Exception during session, topic or message producer initialization: " + e.getStackTraceString();
  }
}


void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");

  if (*mLog)
      mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n    %s",
		 e.getStackTraceString().c_str());

  cms::Connection *conn = mConn;
  mConn = 0;

  // This somehow results in proper exception getting thrown in the amq thread.
  // Go figure ... zen engineering or just a bug?
  //
  // 2013-03-07: Argh ... and this apparently crashes with activemq-cpp-3.5.0!
  // What to do?
  // Use mConn == 0 in main thread as a signal things should be reopened.
  // But do no call close :)
  if (conn)
  {
    conn->close();
    delete conn;
  }
}



Mime
View raw message