activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Lloyd <lloy0...@adam.com.au>
Subject ActiveMQ v5.0 + JDK 1.5.0 + MySQL (Too Many Connections)
Date Tue, 08 Jan 2008 04:03:27 GMT

Hi There,


I keep getting this:

2008-01-08 13:16:02,466 [main] FATAL ActiveMQTest ActiveMQTest.java (98) 
- Data source rejected establishment of connection,  message from 
server: "Too many connections"


It happens to both JMS and Stomp clients where the persistent header is set.

Notes before long listings:

* It only happens to MySQL
* It appears to work with Postgresql
* It also works with the default storage system [whatever that is]

The most reliable way for me to reproduce this error is to configure 
ActiveMQ to use MySQL via JDBC (or a journalled JDBC), run the first 
Java listing once with 98 messages and then run it again. I'll get an 
exception when I try to run it again.

Postgres on the other hand, whilst it wasn't particularly "fast", kept 
on processing messages; I manually kept re-running 100 or more messages 
until I stacked 50 000 messages in.

Any ideas would be appreciated as we'd prefer to keep our MySQL 
databases for when we want a persistent store (standard operating 
environment reasons).

DSL

===
PROGRAM LISTINGS AND MACHINE SPECS AS FOLLOWS (Long)

I have:

   SunOS isengaard 5.10 Generic_120012-14 i86pc i386 i86pc
   java full version "1.5.0_12-b04"
   MySQL server version: 5.0.45

The machine I'm running on is an Intel Quad Core E6600 with 4Gb of 
memory and modern hard drive.

...which is logged as:

WARN  JDBCPersistenceAdapter         - Old message cleanup failed due 
to: java.io.IOException: Data source rejected establishment of 
connection,  message from server: "Too many connections"
java.io.IOException: Data source rejected establishment of connection, 
message from server: "Too many connections"
         at 
org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:45)
         at 
org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:61)
         at 
org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:570)
         at 
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:213)
         at 
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$1.run(JDBCPersistenceAdapter.java:187)
         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)
         at 
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:280)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:142)
         at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:166)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
         at java.lang.Thread.run(Thread.java:595)

My configuration is as follows:



<beans
   xmlns="http://www.springframework.org/schema/beans"
   xmlns:amq="http://activemq.org/config/1.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
   http://activemq.org/config/1.0 
http://activemq.apache.org/schema/activemq-core.xsd
   http://activemq.apache.org/camel/schema/spring 
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

   <!-- Allows us to use system properties as variables in this 
configuration file -->
   <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

   <broker xmlns="http://activemq.org/config/1.0" brokerName="localhost" 
dataDirectory="${activemq.base}/data">
     <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61616" 
discoveryUri="multicast://default"/>
        <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
        <transportConnector name="xmpp"    uri="xmpp://localhost:61222"/>
     </transportConnectors>

     <networkConnectors>
       <networkConnector name="default-nc" uri="multicast://default"/>
     </networkConnectors>


     <persistenceAdapter>
         <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
     </persistenceAdapter>

     <managementContext>
        <managementContext connectorPort="1099" 
jmxDomainName="org.apache.activemq"/>
     </managementContext>

   </broker>

   <!-- lets create a command agent to respond to message based admin 
commands on the ActiveMQ.Agent topic -->
   <commandAgent xmlns="http://activemq.org/config/1.0"/>


   <!-- An embedded servlet engine for serving up the Admin console -->
   <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
     <connectors>
       <nioConnector port="8161" />
     </connectors>

     <handlers>
       <webAppContext contextPath="/admin" 
resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
     </handlers>
   </jetty>

   <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" 
destroy-method="close">
     <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
     <property name="url" 
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
     <property name="username" value="root"/>
     <property name="maxActive" value="5"/>
     <property name="password" value="passw0rd"/>
     <property name="poolPreparedStatements" value="true"/>
   </bean>

</beans>
<!-- END SNIPPET: example -->

I've tried fiddling with maxActive (from setting it really high, to 
setting it to -1 which means "as many as you want" according to "DBCP" 
docs).

I don't seem to be able to diagnose what is actually wrong or why it's 
happening. The things I have noticed are:

1. This program will make it fail after about 50 messages:

/*
  * To change this template, choose Tools | Templates
  * and open the template in the editor.
  */
package au.com.adam.activemqtest;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

/**
  *
  * @author lloy0076
  *
  * If you set N - 3 messages, you should get N messages in the
  * queue. Set N by using the -Dmax=N on the Java commandline.
  */
public class ActiveMQTest {

     Logger logger = Logger.getLogger(ActiveMQTest.class);
     private int max = 0;
     private int count = 0;
     private String queue_name = "";
     private final int DEFAULT_MAX = 100;
     private final String QUEUE_NAME = "activemq";

     public static void main(String[] args) {
         ActiveMQTest instance = new ActiveMQTest();

         instance.logger.debug("Starting test...");
         instance.getConfiguration();
         instance.sendMessages();
     }

     private void getConfiguration() {
         int local_max = DEFAULT_MAX;

         if (System.getProperty("max") != null) {
             local_max = 
Integer.valueOf(System.getProperty("max")).intValue();
         }

         logger.debug("Setting max to " + local_max);
         setMax(local_max);

         String local_queue_name = QUEUE_NAME;

         if (System.getProperty("queue_name") != null) {
             local_queue_name = System.getProperty("queue_name");
         }

         logger.debug("Setting queue name to " + local_queue_name);
         setQueue_name(local_queue_name);
     }

     private void sendMessages() {
         Connection conn = null;
         try {
             long start_time = System.currentTimeMillis();
             logger.debug("Start time: " + start_time);

             String user = ActiveMQConnection.DEFAULT_USER;
             String password = ActiveMQConnection.DEFAULT_PASSWORD;
             String broker_url = ActiveMQConnection.DEFAULT_BROKER_URL;

             logger.info("Connecting to: " + broker_url + ".");
             logger.info("Connecting as: " + user);
             logger.info("Password is: " + password);

             logger.info("Sending " + getMax() + " messages.");

             ActiveMQConnectionFactory connection_factory = new 
ActiveMQConnectionFactory("", "", broker_url);
             conn = connection_factory.createConnection();
             conn.start();

             Session session = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createQueue(getQueue_name());
             MessageProducer producer = session.createProducer(destination);

             TextMessage expectation_message = 
session.createTextMessage("reset:::" + getMax());
             producer.send(expectation_message);

             for (int i = 0; i < getMax(); i++) {
                 TextMessage message = 
session.createTextMessage(Integer.toString(i));
                 producer.send(message);
             }

             TextMessage check_message = 
session.createTextMessage("check:::");
             producer.send(check_message);

             long end_time = System.currentTimeMillis();
             long duration = end_time - start_time;
             logger.debug("End time: " + end_time);
             logger.debug("Duaration: " + duration);
         } catch (JMSException ex) {
             logger.fatal(ex.getMessage());
         } finally {
             if (conn != null) {
                 try {
                     logger.debug("Closing connection...");
                     conn.close();
                 } catch (Throwable ex) {
                     logger.fatal(ex.getMessage());
                 }
             }
         }
     }

     public int getMax() {
         return max;
     }

     public void setMax(int max) {
         this.max = max;
     }

     public int getCount() {
         return count;
     }

     public void setCount(int count) {
         this.count = count;
     }

     public String getQueue_name() {
         return queue_name;
     }

     public void setQueue_name(String queue_name) {
         this.queue_name = queue_name;
     }
}

2. Stomp (from Net::Perl or Gozirra's Stomp) don't seem to trigger the error

#!/usr/bin/perl

use strict;
use warnings;

use Carp;
use Data::Dumper;

use Net::Stomp;

# subscribe to messages from the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );

# Two less to cope with the messages *outside* of the while loop; 1000
# messages should be in the queue when this finishes processing.
my $max = 998;
my $count = 0;

$stomp->send( { destination => '/queue/foo', body => "reset:::$max", });

while ($count < $max) {
     $stomp->send( { destination => '/queue/foo', body => 
'count::$count', persistent => 'true', } );
     print "x" if ($count %500 == 0);
     $count++;
     print "\n" if ($count % 5000 == 0);
}

$stomp->send( { destination => '/queue/foo', body => 'check:::$count' } );


NOTE:

Non-persistent stomp messages go through fine:

/*
  * To change this template, choose Tools | Templates
  * and open the template in the editor.
  */
package au.com.adam.gozirrastomp;

import java.io.IOException;
import java.util.logging.Level;
import javax.security.auth.login.LoginException;
import net.ser1.stomp.Client;
import org.apache.log4j.Logger;

/**
  *
  * @author lloy0076
  */
public class GozirraMain {

     Logger logger = Logger.getLogger(GozirraMain.class);
     private int max = 0;
     private int count = 0;
     private String queue_name = "";
     private final int DEFAULT_MAX = 100;
     private final String QUEUE_NAME = "/queue/foo";

     /**
      * @param args the command line arguments
      */
     public static void main(String[] args) {
         GozirraMain instance = new GozirraMain();

         instance.getConfiguration();
         instance.sendMessages();

         System.exit(0);
     }

     private void getConfiguration() {
         int local_max = DEFAULT_MAX;

         if (System.getProperty("max") != null) {
             local_max = 
Integer.valueOf(System.getProperty("max")).intValue();
         }

         logger.debug("Setting max to " + local_max);
         setMax(local_max);

         String local_queue_name = QUEUE_NAME;

         if (System.getProperty("queue_name") != null) {
             local_queue_name = System.getProperty("queue_name");
         }

         logger.debug("Setting queue name to " + local_queue_name);
         setQueue_name(local_queue_name);
     }

     private void sendMessages() {
         try {
             long start_time = System.currentTimeMillis();
             logger.debug("Start time: " + start_time);

             String user = null;
             String password = null;
             String broker_url = "localhost";
             int port = 61613;

             logger.info("Connecting to: " + broker_url + ".");
             logger.info("Port: " + port);
             logger.info("Connecting as: " + user);
             logger.info("Password is: " + password);

             logger.info("Sending " + getMax() + " messages.");

             Client c = new Client(broker_url, port, "", "");

             c.send(getQueue_name(), "reset:::" + getMax());

             int i = 0;
             for (i = 0; i <= getMax(); i++) {
                 c.send(getQueue_name(), "count:::" + i);
             }

             c.send(getQueue_name(), "check");

             logger.info("Sent " + i + " messages.");

             long end_time = System.currentTimeMillis();
             long duration = end_time - start_time;
             logger.debug("End time: " + end_time);
             logger.debug("Duaration: " + duration);
         } catch (IOException ex) {
 
java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE, 
null, ex);
         } catch (LoginException ex) {
 
java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE, 
null, ex);
         }
     }

     public int getMax() {
         return max;
     }

     public void setMax(int max) {
         this.max = max;
     }

     public int getCount() {
         return count;
     }

     public void setCount(int count) {
         this.count = count;
     }

     public String getQueue_name() {
         return queue_name;
     }

     public void setQueue_name(String queue_name) {
         this.queue_name = queue_name;
     }
}

Mime
View raw message