activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ehud Eshet (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-5537) Network Connector Throughput
Date Wed, 28 Jan 2015 09:20:34 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14294913#comment-14294913
] 

Ehud Eshet commented on AMQ-5537:
---------------------------------

I hope I was not too rude by specifying Fix Version as 5.11.
Where is the place or the mechanism to discuss such enhancement requests?



> Network Connector Throughput
> ----------------------------
>
>                 Key: AMQ-5537
>                 URL: https://issues.apache.org/jira/browse/AMQ-5537
>             Project: ActiveMQ
>          Issue Type: Improvement
>          Components: Connector
>    Affects Versions: 5.x
>         Environment: Network of Brokers. Platform agnostic. Local Broker has a networkConnector
defined to forward all messages to a remote broker.
>            Reporter: Ehud Eshet
>
> *Requirement*
> 1.  Allow network connector to use transactions when forwarding persistent messages.
> 2. Provide the following new network connector properties:
> maxMessagesPerTransaction - when specified and great than 1, use transactions.
> maxTransactionLatencyMillis - commit immediately when time passed since last commit is
more than specified.
> Let's say both parameters are set as 1000.
> Network connector should commit after every 1000 messages or when more than 1000ms passed
since last commit (the sooner).
> *Background*
> Persistent messages throughput is significantly slower.
> When using transactions and committing every 1000 messages, throughput on local broker
with levelDB is about 12,000 messages of 1KB per second.
> Network connector does not use transactions. Thus, its throughput is limited to few hundreds
messages per second.
> When imitating network connector functionality (receive from local broker and send to
remote broker) using transactions on both sessions, I managed to have a sustained throughput
of 10,000 messages/sec stored on local broker plus up to 11,000 messages/s forwarded to remote
broker (forwarding throughput must be higher to allow catch up after reconnect).
> *Sample code*
> {code:title=TransactionalStoreAndForward.java|borderStyle=solid}
> import java.util.Date;
> import javax.jms.*;
> import javax.jms.Connection;
> import javax.jms.Message;
> import org.apache.activemq.*;
> import org.apache.activemq.broker.*;
> public class TransactionalStoreAndForward implements Runnable 
> {
> 	private final String m_queueName;
> 	private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF;
> 	
> 	private Connection m_fromConn = null, m_toConn = null;
> 	private Session m_fromSess = null, m_toSess = null;
> 	private MessageConsumer m_msgConsumer = null;
> 	private MessageProducer m_msgProducer = null;
> 	
> 	private boolean m_cont = true;
> 	
> 	public static final int MAX_MESSAGES_PER_TRANSACTION = 500;
> 	public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L;
> 	
> 	public TransactionalStoreAndForward(String fromUri, String toUri, String queueName)
> 	{
> 		m_fromAMQF = new ActiveMQConnectionFactory(fromUri);
> 		m_toAMQF = new ActiveMQConnectionFactory(toUri);
> 		m_queueName = queueName;
> 	}
> 	
> 	@Override
> 	public void run() 
> 	{
> 		while (m_cont)
> 		{
> 			connect();
> 			process();
> 		}
> 	}
> 	
> 	private void process()
> 	{
> 		long txMessages = 0, totalMessages = 0, lastPrintMessages = 0;
> 		long startTime = 0L;
> 		long lastTxTime = startTime, lastPrintTime = startTime;
> 		
> 		Message msg = null;
> 		
> 		try {
> 			while (m_cont)
> 			{
> 				while ((msg = m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null)
> 				{
> 					if (startTime == 0) {
> 						startTime = System.currentTimeMillis();
> 						lastTxTime = startTime;
> 						lastPrintTime = startTime;
> 					}
> 					
> 					m_msgProducer.send(msg);
> 					txMessages++;
> 					totalMessages++;
> 					
> 					if (txMessages == MAX_MESSAGES_PER_TRANSACTION || 
> 							System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS)
> 					{
> 						m_toSess.commit();
> 						m_fromSess.commit();
> 						lastTxTime = System.currentTimeMillis();
> 						txMessages = 0;
> 					}
> 					
> 					if (System.currentTimeMillis() - lastPrintTime > 10000L) {
> 						System.out.println("processed " + (totalMessages - lastPrintMessages) + " messages
during last 10 seconds. Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis()
- startTime)) + " at " + new Date());
> 						lastPrintTime = System.currentTimeMillis();
> 						lastPrintMessages = totalMessages;
> 					}
> 				}
> 				
> 				if (txMessages > 0)
> 				{
> 					m_toSess.commit();
> 					m_fromSess.commit();
> 					lastTxTime = System.currentTimeMillis();
> 					txMessages = 0;
> 				}
> 				else {
> 					System.out.println("Idle for more than a minute at " + new Date());
> 				}
> 			}
> 		}
> 		catch(JMSException jmse)
> 		{
> 			System.out.println("About to rollback " + txMessages + " messages due to: " + jmse.getMessage());
> 			try {
> 				m_toSess.rollback();
> 				m_fromSess.rollback();
> 				System.out.println("Rollback completed. will reconnect soon ...");
> 			}
> 			catch (JMSException re)
> 			{
> 				System.out.println("Rollback failed !!!");
> 				re.printStackTrace();
> 			}
> 		}
> 	}
> 	
> 	private void connect()
> 	{
> 		boolean isNotOK = true;
> 		String target = null;
> 		while (isNotOK) 
> 		{
> 			try {
> 				if (m_fromConn != null) 
> 				{
> 					m_fromConn.close();
> 					m_fromConn = null;
> 				}
> 				
> 				if (m_toConn != null) 
> 				{
> 					m_toConn.close();
> 					m_toConn = null;
> 				}
> 				
> 				target = m_fromAMQF.getBrokerURL();
> 				m_fromConn = m_fromAMQF.createConnection();
> 				m_fromConn.start();
> 				m_fromSess = m_fromConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
> 				Destination fromDest = m_fromSess.createQueue(m_queueName);
> 				m_msgConsumer = m_fromSess.createConsumer(fromDest);
> 				
> 				target = m_toAMQF.getBrokerURL();
> 				m_toConn = m_toAMQF.createConnection();
> 				m_toConn.start();
> 				m_toSess = m_toConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
> 				Destination toDest = m_toSess.createQueue(m_queueName);
> 				m_msgProducer = m_toSess.createProducer(toDest);
> 				isNotOK = false;
> 				System.out.println("Successful connection at " + new Date());
> 			}
> 			catch(Exception e) {
> 				System.out.println("Failed to connect to " + target + " due to: " + e.getMessage());
> 				try {
> 					Thread.sleep(60000L);
> 				} catch (InterruptedException e1) {}
> 				
> 				System.out.println("About to retry connection at " + new Date());
> 			}
> 		}
> 	}
> 	
> 	public void cleanup() throws Exception
> 	{
> 		m_cont = false;
> 		
> 		if (m_fromConn != null) 
> 		{
> 			m_fromConn.close();
> 			m_fromConn = null;
> 		}
> 		
> 		if (m_toConn != null) 
> 		{
> 			m_toConn.close();
> 			m_toConn = null;
> 		}
> 	}
> 	
> 	public static void main(String[] args) throws Exception
> 	{
> 		BrokerService broker = BrokerFactory.createBroker("xbean:activemq_gateway.xml", true);
> 		broker.waitUntilStarted();
> 		TransactionalStoreAndForward tsaf = new TransactionalStoreAndForward("vm://AuditGW",
"tcp://10.2.154.51:61616", "AUDIT.EVENT");
> 		Thread t = new Thread(tsaf);
> 		t.start();
> 		t.join();
> 		tsaf.cleanup();
> 		broker.stop();
> 		broker.waitUntilStopped();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message