activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From luke1 <ma...@tis.hr>
Subject multiple producers and InvalidClientIDException problem, producers wait
Date Tue, 09 May 2006 14:24:17 GMT

When trying to start multiple producers from multiple threads on single
queue, using JmsTemplate with PooledConnectionFactory, I get couple of
following exceptions, and producer threads get stuck in a wait :

[INFO] Service - Sync error occurred: javax.jms.InvalidClientIDException:
Broker: localhost - Client: ID:kiss-1890-1147183464931-3:0 already connected
<javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already
connected>javax.jms.InvalidClientIDException: Broker: localhost - Client:
ID:kiss-1890-1147183464931-3:0 already connected
	at
org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
	at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
	at
org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:67)
	at
org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
	at
org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:77)
	at
org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:500)
	at
org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:106)
	at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:196)
	at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
	at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:88)
	at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:70)
	at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:75)
	at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
	at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:63)
	at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:68)
	at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1108)
	at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1196)
	at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:252)
	at org.apache.activemq.pool.SessionPool.createSession(SessionPool.java:116)
	at org.apache.activemq.pool.SessionPool.makeObject(SessionPool.java:84)
	at
org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:771)
	at org.apache.activemq.pool.SessionPool.borrowSession(SessionPool.java:59)
	at
org.apache.activemq.pool.ConnectionPool.createSession(ConnectionPool.java:67)
	at
org.apache.activemq.pool.PooledConnection.createSession(PooledConnection.java:123)
	at
org.springframework.jms.core.JmsTemplate.createSession(JmsTemplate.java:408)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:545)
	at ProducerTest$ProducerRunnable.run(ProducerTest.java:83)

After that, some of the threads continue sending messages, while others are
stuck in a wait: 
Thread-9@1d2, priority=5, in group 'main', status: 'WAIT'
	  wait():-1, Object.java
	  wait():429, Object.java
	  waitFor():267, FutureTask.java
	  get():117, FutureTask.java
	  getResult():44, FutureResponse.java
	  request():69, ResponseCorrelator.java
	  syncSendPacket():1108, ActiveMQConnection.java
	  ensureConnectionInfoSent():1196, ActiveMQConnection.java
	  createSession():252, ActiveMQConnection.java
	  createSession():116, SessionPool.java
	  makeObject():84, SessionPool.java
	  borrowObject():771, GenericObjectPool.java
	  borrowSession():59, SessionPool.java
	  createSession():67, ConnectionPool.java
	  createSession():123, PooledConnection.java
	  createSession():408, JmsTemplate.java
	  execute():496, JmsTemplate.java
	  send():545, JmsTemplate.java

Test code:

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;

import junit.framework.TestCase;

import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;

public class ProducerTest extends TestCase {

    public void testMultipleProducers() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseLoggingForShutdownErrors(true);
        broker.setUseShutdownHook(true);
        broker.setBrokerName("localhost");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setTransportConnectorURIs(new String[] {"vm://localhost"});
        broker.start();

        ActiveMQConnectionFactory jmsFactory = new
ActiveMQConnectionFactory();
        jmsFactory.setBrokerURL("vm://localhost");

        PooledConnectionFactory pooledConnectionFactory = new
PooledConnectionFactory(jmsFactory);

        JmsTemplate sendTemplate = new JmsTemplate();
        sendTemplate.setPubSubDomain(false);
        sendTemplate.setConnectionFactory(pooledConnectionFactory);

        Destination testDestination = new ActiveMQQueue("TEST_QUEUE");

        int producerCount = 10;
        ProducerRunnable[] producerRunnables = new
ProducerRunnable[producerCount];

        for (int i=0; i<producerCount; i++) {
            producerRunnables[i] = new ProducerRunnable(sendTemplate,
testDestination, "Producer_" + i);
        }

        for (int i=0; i<producerCount; i++) {
            new Thread(producerRunnables[i]).start();
        }

        System.out.println("Started all producers");

        synchronized (this) {
            wait(60000);
        }

        broker.stop();
    }

    private static class ProducerRunnable implements Runnable {
        private JmsTemplate template;
        private Destination destination;
        private String name;

        public ProducerRunnable(JmsTemplate template, Destination
destination, String name) {
            this.template = template;
            this.destination = destination;
            this.name = name;
        }

        public void run() {
            final String messagePrefix = "test";

            System.out.println("Starting producer: " + name);

            for (int i=0; i<1000; i++) {
                final String message = messagePrefix + i;

                template.send(destination, new MessageCreator() {
                    public javax.jms.Message createMessage(Session session)
throws JMSException {
                        ObjectMessage objMessage =
session.createObjectMessage();
                        objMessage.setObject(message);
                        return objMessage;
                    }
                });

                System.out.println(name + " sent: " + message);
            }
            System.out.println("Producer " + name + " finished");
        }
    }
}

I guess it's some sort of a race condition. If I add some initial delay when
starting up those threads (before sending the first message), then all of
them get a connection and continue sending messages.

I've tried 4.0-RC2 and 4.0R-C3. Same thing happens with tcp
connector/transport.

TIA
--
View this message in context: http://www.nabble.com/multiple-producers-and-InvalidClientIDException-problem%2C-producers-wait-t1584904.html#a4300897
Sent from the ActiveMQ - User forum at Nabble.com.


Mime
View raw message