activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlaio <>
Subject Activemq CMS - high data rate lockup
Date Fri, 15 Jul 2011 19:08:21 GMT

Perhaps my search foo is weak, but I have been unable to find any recent
examples of people sending high data data rates over activemq CMS. -> has
some good data (although from 2007).

Have found some references to CMS 2.2 through the nabble search, but coming
up short at the moment for more recent applications.

I have a pretty simple data driven signal processing system, the processing
is essentially linear:


Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
vary in size, but are generally pretty big (average ~ 200kB, but can be up
to a 1MB).

I moved from a simple sockets to activemq (for portability reasons), I am
using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
(2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
for me, so updating versions is a bigish deal (I know 5.5 is out)).

I can slow down the processing, and running with anything close to 40Mb/sec
results in the processes consuming a full CPU each (where as running without
activemq each process consumes ~ 20% of a processor) so I am concerned about
activemq's overhead (I suppose it is more likely the overhead of the CMS

I don't care about persistence, the data is time sensitive so if the
processing goes down it is of no value for me (video like data).  I have
disabled producerFlowControl (unsure if that was a good idea), but as a data
driven processing chain if I am not able to keep up with the data rate then
I am dead in the water.  Going to try nio for the transport as I have seen
that mentioned a few places for high throughput usage.

Seem to get into a deadlock state where the process is waiting on a mutex:

__kernel_vsyscall() at 0xffffe410	
pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12	
decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3	
decaf::util::concurrent::Mutex::wait() at 0xf784d767	
decaf::util::concurrent::Mutex::wait() at 0xf784d5f2	
activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7	
activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8	
activemq::core::ActiveMQConnection::oneway() at 0xf746af12	
activemq::core::ActiveMQSession::send() at 0xf74bb9e2	
activemq::core::ActiveMQProducer::send() at 0xf74aa764	
activemq::core::ActiveMQProducer::send() at 0xf74a7dd1	
activemq::core::ActiveMQProducer::send() at 0xf74a9585	
activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235	
io::send() at io.cpp:162 0x8057614		
process_data_msg() at main.cpp:814 0x804cf7c	
process_message() at main.cpp:891 0x804d272	
main() at main.cpp:1,097 0x804e481	

I esentially used the cpp samples when I created my io library (just a
simple wrapper).  

Consumer code (Sanitised - so may not be syntactically correct) :

void activeMqConsumer::runConsumer()

	try {

		// Create a ConnectionFactory
		ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory( brokerURI );

		// Create a Connection
		connection = connectionFactory->createConnection();
		delete connectionFactory;

		ActiveMQConnection* amqConnection =
dynamic_cast&lt;ActiveMQConnection*&gt;( connection );
		if( amqConnection != NULL ) {
			amqConnection->addTransportListener( this );


		// Create a Session
		if( clientAck ) {
			session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
		} else {
			session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

		// Create the destination (Topic or Queue)
		if( useTopic ) {
			destination = session->createTopic( destURI );
		} else {
			destination = session->createQueue( destURI );

		// Create a MessageConsumer from the Session to the Topic or Queue
		consumer = session->createConsumer( destination );
		consumer->setMessageListener( this );

	} catch (CMSException& e) {

// Called from the consumer since this class is a registered
void activeMqConsumer::onMessage( const Message* message )
	INT32 msgSize;
	INT32 bytesRead = 0;
	FLOAT64 latency = 0;
	TIME currentTime;
	INT32 rval;

	rval = SDL_SemWait(this->pElementAvailable);
	if (rval == -1)
		printf("(onMessage) error: SDL_SemWait failed\n");

		// cast the activeMq "message" into an array of bytes
		const cms::BytesMessage *bytesMessage = dynamic_cast<const
cms::BytesMessage *>(message);
	    if(bytesMessage != NULL)
	        msgSize = bytesMessage->getBodyLength();
			bytesRead = bytesMessage->readBytes((unsigned char*)this->pInputMsg,
			totalBytesRecvd += msgSize;
			// the assumption here is that you receive messages after they have been
			// if the times between processing modules are not synchronized all bets
are off.
			latency = (FLOAT64)currentTime - (FLOAT64)this->pInputMsg->hdr.time;
			this->accruedLatency += latency;

	      if(clientAck == TRUE)
   catch(cms::CMSException &exception)

	if(startTime == 0)

	// indicate that there is data available
	rval = SDL_SemPost(this->pDataAvailable);
	if (rval == -1)
		printf("(enqueue) error: SDL_SemPost failed\n");


Producer code:

activeMqProducer::activeMqProducer( const std::string& brokerURI,
                         const std::string& destURI,
                         BOOLEAN useTopic)
		// Create a ConnectionFactory
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )

	    // Create a Connection
	    connection = connectionFactory->createConnection();

	    sessionTransacted = false;

	    // Create a Session
	    if( sessionTransacted )
	        session = connection->createSession( Session::SESSION_TRANSACTED );
	        session = connection->createSession( Session::AUTO_ACKNOWLEDGE );

	    // Create the destination (Topic or Queue)
	    if( useTopic )
	        destination = session->createTopic( destURI );
	        destination = session->createQueue( destURI );

	    // Create a MessageProducer from the Session to the Topic or Queue
	    producer = session->createProducer( destination );
	    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

	    message = session->createBytesMessage();


void activeMqProducer::send(IO_MSG *pOutputMsg)
	// Set the send time in the IO message
	// Convert the message into an activeMQ bytesMessage
	message->setBodyBytes( (const unsigned char*)pOutputMsg,
pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
	// Send to broker
	producer->send( message );

Any suggestions on how to speed up the data rates and stop the producer from
locking up would be greatly appreciated. 


View this message in context:
Sent from the ActiveMQ - User mailing list archive at

View raw message