activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gerdes, Mike" <Mike.Ger...@airbus.com>
Subject AW: limit the number of messages per second
Date Mon, 19 Jun 2006 09:11:22 GMT

ok since the attachments didn't worked, here is the java reciever so modified that it doesn't
need to be started with any parameters, the same is true for the sender that is below the
reciever. just start two recievers and one sender.

import java.util.Date;

import javax.jms.Connection;
import javax.jms.Message;
	import javax.jms.MessageConsumer;
	import javax.jms.Session;

	import org.apache.activemq.ActiveMQConnectionFactory;
	import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

	public class TestIncMessageNumberReceiver {
	
		public static void main(String[] args) throws Exception {
		
			ActiveMQConnectionFactory factory;
			MessageConsumer consumer;
			Connection connection;
			Session session;

	    		factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61636");
	    
	    	factory.setOptimizeAcknowledge(false);
	    	connection = factory.createConnection();
	        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		    		ActiveMQQueue subQueue = new ActiveMQQueue("sizetest.source");
		            consumer = session.createConsumer(subQueue);    
	    
	    	System.out.println("Connecting to JMS server.");
	        connection.start();
	        System.out.println("Connected.\n");
	    
	        int n = 0;
	        Message m = consumer.receive();
	        long starttime = new Date().getTime();
	       
	        while (!(m==null)) {
	        
	        	m = consumer.receive(30000);
	        	n++;
	        }
	       
	        long endtime = new Date().getTime();
	       
	        System.out.println(n + " messages received in " + (endtime-starttime-30000)/1000
+ " sec.");
	    	System.exit(0);
		}
	}


and the sender:

import java.util.Date;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;


public class TestIncMessageNumberSender {

	public static void main(String[] args) throws Exception {
	
		int i = 1;
		long sendingtime = 0;
		int sendmessages = 0;
		int messagesize = 256;
		ActiveMQConnectionFactory factory;
		MessageProducer producer;
		Connection connection;
		Session session;
		long testtime = 0;

    		factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://localhost:61636");
	    
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    
	    		ActiveMQQueue pubQueue = new ActiveMQQueue("sizetest.source");
	            producer = session.createProducer(pubQueue);	    
    
    	System.out.println("Connecting to JMS server.\n");
        connection.start();
       
        int ab = 0;
   
        while (ab < 15) {
        
        	BytesMessage b = session.createBytesMessage();
        	int remessages = 0;
	        sendmessages = 0;
        	i = 0;
        
        	while (i<messagesize){
        
        		b.writeByte((byte) 1);
        		i++;
        	}      
        
        	System.out.println("The messages size is " + (messagesize/1000) + "kb.");
        	messagesize = messagesize * 2; 
	        
	        	i = 0;
	        	testtime = new Date().getTime();
	        	sendingtime = new Date().getTime();
	        
		        while ((new Date().getTime() - sendingtime) < 30000){
		        	
		        	producer.send(b);
		        	i++;
		        }
		       
		        int n = 0;
		       
		        sendmessages = i;

	        System.out.println(sendmessages + " messages send.");
	        System.out.println("The test took " + ((new Date().getTime() - testtime)/1000) +
" seconds.");
	        ab++;
	        System.out.println("The average number of messages per second: " + sendmessages/30
+ "\n");
        }
       
    	System.exit(0);
	}
}

-----Ursprüngliche Nachricht-----
Von: Gerdes, Mike
Gesendet: Montag, 19. Juni 2006 11:02
An: activemq-users@geronimo.apache.org
Betreff: AW: limit the number of messages per second




ok here is now my amq config file and the java applications that I have used to create the
error.

activemq.xml:

<!-- START SNIPPET: xbean -->
<beans xmlns="http://activemq.org/config/1.0">



  <broker useJmx="true" brokerName="JMSBroker" persistent="false">


	<plugins>

		<jaasAuthenticationPlugin configuration="activemq-domain" />

		<authorizationPlugin>
			<map>
				<authorizationMap>
					<authorizationEntries>
						<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
						<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
					</authorizationEntries>

				</authorizationMap>
			</map>
		</authorizationPlugin>
	</plugins>



    <persistenceAdapter>
      <journaledJDBC journalLogFiles="5" dataDirectory="/usr/servicemix/data"/>
      <!-- To use a different datasource, use th following syntax : -->
      <!--

      <journaledJDBC journalLogFiles="5" dataDirectory="../data" dataSource="#postgres-ds"/>
       -->
    </persistenceAdapter>



   <transportConnectors>
       <transportConnector uri="tcp://localhost:61636?needClientAuth=true&amp;wantClientAuth=true"
discoveryUri="multicast://default"/>
    </transportConnectors>
 


    <networkConnectors>
      <!-- by default just auto discover the other brokers -->
      <networkConnector uri="multicast://default" userName="admin" password="admin"/>
      <!--


      <networkConnector uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
      -->
    </networkConnectors>
 


  </broker>



  <!-- MySql DataSource Sample Setup -->
  <!--

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
  -->


</beans>

To create the problem start amq, then start two recievers and one sender with the parameters
"queue amq tcp".



-----Ursprüngliche Nachricht-----
Von: Gerdes, Mike

Gesendet: Donnerstag, 15. Juni 2006 16:08
An: activemq-users@geronimo.apache.org
Betreff: AW: limit the number of messages per second




I guess that should be possible, because I manage to reproduce it on another machine. I will
try to do it tomorrow.

cya and thanks

-----Ursprüngliche Nachricht-----
Von: James Strachan [mailto:james.strachan@gmail.com]
Gesendet: Donnerstag, 15. Juni 2006 16:04
An: activemq-users@geronimo.apache.org
Betreff: Re: limit the number of messages per second



Is there any way you can create a test case that manages to crash the broker?

On 6/15/06, Gerdes, Mike <Mike.Gerdes@airbus.com> wrote:
>
> hm I will try to use prefetch and see if it works, but mostly the problem is not the
consumer, but the sender and the broker having to manage everything.
>
> The broker crashes or better it gets a deadlock. So the usagemanagers for the queues
request more memory and get it, at some point in time all memory is used up and they request
more and wait till they get it. That never happens, because the reference coúnters don'T
get reduced and they think the queue is totally full. At that point the sender also stops
and waits for the queue to free.
>
> I am using SSL and JAAS to allow only nodes that I trust to publish to the system, but
when systems get bigger a misconfiguration might occure or somebody finds a trick or a way
around it. It is not that critical but would be very nice to have.
>
> I am using AMQ4.0 the SNAPSHOT 01.05.2006. I have tried to use persistent and non-persistent
messages, that makes no change.
>
> I haven't found the options on how to set it, that was before you added the warning and
send this email, so I will try it and hope thats the problem.
>
>
> -----Ursprüngliche Nachricht-----
> Von: James Strachan [mailto:james.strachan@gmail.com]
> Gesendet: Donnerstag, 15. Juni 2006 15:45
> An: activemq-users@geronimo.apache.org
> Betreff: Re: limit the number of messages per second
>
>
>
> On 6/15/06, Gerdes, Mike <Mike.Gerdes@airbus.com> wrote:
> >
> > I want this feature to make AMQ a bit more stable, to reduce network traffic and
prevent flooding and to avoid that a software with errors kills my system.
>
> You can use the prefetch for that...
> http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html
>
> to limit how many messages are dispatched to a consumer.
>
>
> > From what I noticed is that AMQ does have some memory problems and that it hits
under certain circumstances 100 used memory and crashes then.
>
> Whats the crash? Is this the broker or client?
>
> >  But thats the smaller reason. The major reason is just for security. In my testing
and application I need to have a system that is more or less stable and secure. And DoS and
flooding of queues seems to me a big problem.
>
> How about using security so only nodes you trust can publish to your queues?
>
> > One thing about the memory problem, I have encountered. With more then one consumer
and using queue, AMQ crashes with 100% used memory if I send as fast as possible. This happens
with and without async messages. What I noticed is that the usageManagers request more and
more memory and at some point don't get it and then block. When a message from a queue is
read, its memory is not freed. With only on consumer this haven'T been the case. There has
the memory be freed.
> > (I put reference counter System.out.println into the usageManger)
>
> Which version of ActiveMQ are you using? Also are you using persistent
> queues? (I'm guessing not as that is not memory bound).
>
> If you are using 4.0 have you disabled optimizeAck? (see the known issues)
> http://incubator.apache.org/activemq/activemq-40-release.html
>
> --
>
> James
> -------
> http://radio.weblogs.com/0112098/
>
>
> This mail has originated outside your organization,
> either from an external partner or the Global Internet.
> Keep this in mind if you answer this message.
>
> This mail has originated outside your organization, either from an external partner or
the Global Internet. Keep this in mind if you answer this message.
>


--


James
-------
http://radio.weblogs.com/0112098/



This mail has originated outside your organization,
either from an external partner or the Global Internet.
Keep this in mind if you answer this message.

This mail has originated outside your organization, either from an external partner or the
Global Internet. Keep this in mind if you answer this message.



This mail has originated outside your organization,
either from an external partner or the Global Internet.
Keep this in mind if you answer this message.




This e-mail is intended only for the above addressee. It may contain
privileged information. If you are not the addressee you must not copy,
distribute, disclose or use any of the information in it. If you have
received it in error please delete it and immediately notify the sender.
Security Notice: all e-mail, sent to or from this address, may be
accessed by someone other than the recipient, for system management and
security reasons. This access is controlled under Regulation of
Investigatory Powers Act 2000, Lawful Business Practises.



This mail has originated outside your organization,
either from an external partner or the Global Internet.
Keep this in mind if you answer this message.

This mail has originated outside your organization, either from an external partner or the
Global Internet. Keep this in mind if you answer this message.

Mime
View raw message