activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ehud Eshet (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-5537) Network Connector Throughput
Date Sun, 25 Jan 2015 09:01:34 GMT
Ehud Eshet created AMQ-5537:
-------------------------------

             Summary: Network Connector Throughput
                 Key: AMQ-5537
                 URL: https://issues.apache.org/jira/browse/AMQ-5537
             Project: ActiveMQ
          Issue Type: Improvement
          Components: Connector
    Affects Versions: 5.x
         Environment: Network of Brokers. Platform agnostic. Local Broker has a networkConnector
defined to forward all messages to a remote broker.
            Reporter: Ehud Eshet
             Fix For: 5.11.0


*Requirement*

1.  Allow network connector to use transactions when forwarding persistent messages.
2. Provide the following new network connector properties:
maxMessagesPerTransaction - when specified and great than 1, use transactions.
maxTransactionLatencyMillis - commit immediately when time passed since last commit is more
than specified.

Let's say both parameters are set as 1000.
Network connector should commit after every 1000 messages or when more than 1000ms passed
since last commit (the sooner).

*Background*

Persistent messages throughput is significantly slower.

When using transactions and committing every 1000 messages, throughput on local broker with
levelDB is about 12,000 messages of 1KB per second.

Network connector does not use transactions. Thus, its throughput is limited to few hundreds
messages per second.

When imitating network connector functionality (receive from local broker and send to remote
broker) using transactions on both sessions, I managed to have a sustained throughput of 10,000
messages/sec stored on local broker plus up to 11,000 messages/s forwarded to remote broker
(forwarding throughput must be higher to allow catch up after reconnect).

*Sample code*

{code:title=TransactionalStoreAndForward.java|borderStyle=solid}
import java.util.Date;

import javax.jms.*;
import javax.jms.Connection;
import javax.jms.Message;

import org.apache.activemq.*;
import org.apache.activemq.broker.*;

public class TransactionalStoreAndForward implements Runnable 
{
	private final String m_queueName;
	private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF;
	
	private Connection m_fromConn = null, m_toConn = null;
	private Session m_fromSess = null, m_toSess = null;
	private MessageConsumer m_msgConsumer = null;
	private MessageProducer m_msgProducer = null;
	
	private boolean m_cont = true;
	
	public static final int MAX_MESSAGES_PER_TRANSACTION = 500;
	public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L;
	
	public TransactionalStoreAndForward(String fromUri, String toUri, String queueName)
	{
		m_fromAMQF = new ActiveMQConnectionFactory(fromUri);
		m_toAMQF = new ActiveMQConnectionFactory(toUri);
		m_queueName = queueName;
	}
	
	@Override
	public void run() 
	{
		while (m_cont)
		{
			connect();
			process();
		}
	}
	
	private void process()
	{
		long txMessages = 0, totalMessages = 0, lastPrintMessages = 0;
		long startTime = 0L;
		long lastTxTime = startTime, lastPrintTime = startTime;
		
		Message msg = null;
		
		try {
			while (m_cont)
			{
				while ((msg = m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null)
				{
					if (startTime == 0) {
						startTime = System.currentTimeMillis();
						lastTxTime = startTime;
						lastPrintTime = startTime;
					}
					
					m_msgProducer.send(msg);
					txMessages++;
					totalMessages++;
					
					if (txMessages == MAX_MESSAGES_PER_TRANSACTION || 
							System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS)
					{
						m_toSess.commit();
						m_fromSess.commit();
						lastTxTime = System.currentTimeMillis();
						txMessages = 0;
					}
					
					if (System.currentTimeMillis() - lastPrintTime > 10000L) {
						System.out.println("processed " + (totalMessages - lastPrintMessages) + " messages during
last 10 seconds. Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis()
- startTime)) + " at " + new Date());
						lastPrintTime = System.currentTimeMillis();
						lastPrintMessages = totalMessages;
					}
				}
				
				if (txMessages > 0)
				{
					m_toSess.commit();
					m_fromSess.commit();
					lastTxTime = System.currentTimeMillis();
					txMessages = 0;
				}
				else {
					System.out.println("Idle for more than a minute at " + new Date());
				}
			}
		}
		catch(JMSException jmse)
		{
			System.out.println("About to rollback " + txMessages + " messages due to: " + jmse.getMessage());
			try {
				m_toSess.rollback();
				m_fromSess.rollback();
				System.out.println("Rollback completed. will reconnect soon ...");
			}
			catch (JMSException re)
			{
				System.out.println("Rollback failed !!!");
				re.printStackTrace();
			}
		}
	}
	
	private void connect()
	{
		boolean isNotOK = true;
		String target = null;
		while (isNotOK) 
		{
			try {
				if (m_fromConn != null) 
				{
					m_fromConn.close();
					m_fromConn = null;
				}
				
				if (m_toConn != null) 
				{
					m_toConn.close();
					m_toConn = null;
				}
				
				target = m_fromAMQF.getBrokerURL();
				m_fromConn = m_fromAMQF.createConnection();
				m_fromConn.start();
				m_fromSess = m_fromConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
				Destination fromDest = m_fromSess.createQueue(m_queueName);
				m_msgConsumer = m_fromSess.createConsumer(fromDest);
				
				target = m_toAMQF.getBrokerURL();
				m_toConn = m_toAMQF.createConnection();
				m_toConn.start();
				m_toSess = m_toConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
				Destination toDest = m_toSess.createQueue(m_queueName);
				m_msgProducer = m_toSess.createProducer(toDest);
				isNotOK = false;
				System.out.println("Successful connection at " + new Date());
			}
			catch(Exception e) {
				System.out.println("Failed to connect to " + target + " due to: " + e.getMessage());
				try {
					Thread.sleep(60000L);
				} catch (InterruptedException e1) {}
				
				System.out.println("About to retry connection at " + new Date());
			}
		}
	}
	
	public void cleanup() throws Exception
	{
		m_cont = false;
		
		if (m_fromConn != null) 
		{
			m_fromConn.close();
			m_fromConn = null;
		}
		
		if (m_toConn != null) 
		{
			m_toConn.close();
			m_toConn = null;
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		BrokerService broker = BrokerFactory.createBroker("xbean:activemq_gateway.xml", true);
		broker.waitUntilStarted();
		TransactionalStoreAndForward tsaf = new TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616",
"AUDIT.EVENT");
		Thread t = new Thread(tsaf);
		t.start();
		t.join();
		tsaf.cleanup();
		broker.stop();
		broker.waitUntilStopped();
	}

}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message