Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 3385 invoked from network); 1 Jul 2008 15:06:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Jul 2008 15:06:31 -0000 Received: (qmail 36291 invoked by uid 500); 1 Jul 2008 15:06:32 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 36266 invoked by uid 500); 1 Jul 2008 15:06:32 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 36255 invoked by uid 99); 1 Jul 2008 15:06:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jul 2008 08:06:32 -0700 X-ASF-Spam-Status: No, hits=-1999.8 required=10.0 tests=ALL_TRUSTED,WHOIS_MYPRIVREG X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Jul 2008 15:05:40 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id A0E3F234C154 for ; Tue, 1 Jul 2008 08:06:00 -0700 (PDT) Message-ID: <1684512520.1214924760657.JavaMail.jira@brutus> Date: Tue, 1 Jul 2008 08:06:00 -0700 (PDT) From: "ying (JIRA)" To: dev@activemq.apache.org Subject: [jira] Commented: (AMQ-1585) Problems with pure master/slave configuration In-Reply-To: <700095993.1203290835731.JavaMail.jira@brutus> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/activemq/browse/AMQ-1585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=43832#action_43832 ] ying commented on AMQ-1585: --------------------------- hi, Gary, I test on both win xp and solaris 9 jdk 1.5 To reproduce this issue, i have 1 client producer, I have 3 clients each with its own connection as a consumer 1. use original activemq.xml for master 2. add masterConnectorURI="tcp://localhost:61616" shutdownOnMasterFailure="false" in activemq.xml for slave 3. start master, start slave, start 3 consumers 4. start the producer to send one message, stop the producer ( the delivery is good) 5. start the producer again to send a second message, i will get the exception ( because of this exception, message will get redelivered according to the delivery policy) the jndi.properties of the client looks like the following: java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=failover://(tcp://localhost:61616,tcp://localhost:62616)?randomize=false connection.ConnectionFactory.redeliveryPolicy.initialRedeliveryDelay=300000 connection.ConnectionFactory.redeliveryPolicy.maximumRedeliveries=3 I have another issue for the pure master/slave, not sure whether it is related to this. It has the same setup as the above tests. the formula is: 1. use original activemq.xml for master 2. add masterConnectorURI="tcp://localhost:61616" shutdownOnMasterFailure="false" in activemq.xml for slave 3. start master, start slave, start 3 consumers 4. start the producer to send one message, stop the producer ( the delivery is good) 5. stop the master, stop the slave, do NOT stop 3 consumers, restart the master, restart the slave 6. start the producer to send one message, i will get the following exception: ERROR Service - Async error occurred: java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 at org.apache.activemq.broker.MapTransportConnectionStateRegister.lookupConnectionState(MapTransportConnectionStateRegister.java:93) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1356) at org.apache.activemq.broker.TransportConnection.processBeginTransaction(TransportConnection.java:362) at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:94) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) DEBUG Service - Error occured while processing sync command: java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 at org.apache.activemq.broker.MapTransportConnectionStateRegister.lookupConnectionState(MapTransportConnectionStateRegister.java:63) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1344) at org.apache.activemq.broker.TransportConnection.getConsumerBrokerExchange(TransportConnection.java:1261) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:458) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) DEBUG Service - Error occured while processing sync command: java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 java.lang.IllegalStateException: Cannot lookup a connection that had not been registered: ID:yhe-1633-1214919996630-0:0 at org.apache.activemq.broker.MapTransportConnectionStateRegister.lookupConnectionState(MapTransportConnectionStateRegister.java:93) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1356) at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:413) at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) I think the issue might be when the master is restarted and the consumer establish connections with it before the slave is started so the slave is not aware of those consumers/sessions/connection Please let me know whether i shall create another jira for the this IllegalStateException for pure master/slave BTW, is there any documentation of the component relationship for the activemq core, such as what the responsibility of BrokerService, Region, Broker, TransportConnection etc. I am looking into the source now to see what I can find. Thank you for your quick response. > Problems with pure master/slave configuration > --------------------------------------------- > > Key: AMQ-1585 > URL: https://issues.apache.org/activemq/browse/AMQ-1585 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 4.1.1, 5.0.0, 5.1.0 > Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x > Reporter: Thomas Buckel > Assignee: Gary Tully > Attachments: AMQ-1585.patch > > > As posted in the AMQ user forum: > http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769 > ------------------- > Hi all, > I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology. > Initially I have tried v4.1.1 which failed with an exception. I found an AMQ JIRA ticket which said that Pure/Master slave didn't work in v4.1.1. > Ok, so I switched to AMQ 5.0.0, created 2 configs (master/slave, see end of message) and ran two AMQ instances (on the same box) and most of the times my test (see below) worked, but more often I get various error messages like: > - On the slave: > ERROR Service - Async error occurred: javax.jms.JMSException: Slave broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the pending list > javax.jms.JMSException: Slave broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the pending list > at org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160) > at org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381) > at org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550) > at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211) > at org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450) > at org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77) > at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281) > at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178) > at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100) > at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202) > at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) > at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) > - After having killed the master, stopped the slave, copied the slave's data into the master's data directory various error message came up (as described in the Master/Slave recovery section), e.g. (internal) ActiveMQ topics were not available, the admin webApp showed exceptions and errors on the client. > The test I've created uses Spring 2.0.x and pumps 1000 MapMessages in a queue through Spring's JmsTempate, each message is created within its own transaction, using JmsTransactionManager and TransactionTemplate. > The created messages are consumed by an initially instantiated transactional DefaultMessageListenerContainer. The AMQ JARs in the test's classpath are activemq-core-5.0.0.jar, geronimo-jms_1.1_spec-1.0.jar, geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a really bad performance when only using the activemq-all-5.0.0.jar (maybe this is the problem?). > The test code work's without problems with OpenMQ, but I'd prefer using the nice Pure Master/Active ActiveMQ if I can get it running in a *stable* config ;) > I would highly appreciate any help or suggestions. Maybe my config is wrong or I miss something essential. I've also tried a recent AMQ 5.1 SNAPSHOT which wasn't better... > See below for the small program i used to test (no unit test, behaviour appeared to be non deterministic to me and it's not so nice as i've changed it quite often) > Thanks in advance, > Thomas > > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.org/config/1.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.org/config/1.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > > > masterConnectorURI="tcp://tbuckel-desktop:7778"> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------ > Test code: > import org.apache.activemq.ActiveMQConnectionFactory; > import org.springframework.jms.connection.JmsTransactionManager; > import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessageCreator; > import org.springframework.jms.listener.DefaultMessageListenerContainer; > import org.springframework.transaction.TransactionStatus; > import org.springframework.transaction.support.TransactionCallbackWithoutResult; > import org.springframework.transaction.support.TransactionTemplate; > import javax.jms.*; > import java.math.BigInteger; > import java.util.ArrayList; > import java.util.List; > import java.util.concurrent.TimeUnit; > public class AnotherFailoverTest { > public static final int MESSAGES = 1000; > private final static List notConsumedMessages = new ArrayList(MESSAGES); > private static ConnectionFactory createCF() throws Exception { > ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); > cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false"); > return new TransactionAwareConnectionFactoryProxy(cf); > } > private static void send() throws Exception { > JmsTransactionManager transactionManager = new JmsTransactionManager(); > transactionManager.setConnectionFactory(createCF()); > transactionManager.afterPropertiesSet(); > int i=0; > do { > i++; > final int number = i; > try { > final BigInteger v = new BigInteger(Integer.toString(number)); > TransactionTemplate tt = new TransactionTemplate(transactionManager); > tt.execute(new TransactionCallbackWithoutResult() { > protected void doInTransactionWithoutResult(TransactionStatus status) { > final JmsTemplate template = new JmsTemplate(pcf); > template.setSessionTransacted(true); > template.afterPropertiesSet(); > template.send("testqueue", new MessageCreator() { > public Message createMessage(Session session) throws JMSException { > ObjectMessage dummyMessage = session.createObjectMessage(); > dummyMessage.setObject(v); > synchronized (notConsumedMessages) { > notConsumedMessages.add(v); > } > // System.out.println("Created message " + number + "(" + notConsumedMessages.size() + ")"); > return dummyMessage; > } > }); > } > }); > } catch (Exception e) { > e.printStackTrace(); > System.out.println("Error creating message " + number); > } > } while (i < MESSAGES); > } > private static void setupReceiver() throws Exception { > JmsTransactionManager transactionManager = new JmsTransactionManager(); > transactionManager.setConnectionFactory(createCF()); > transactionManager.afterPropertiesSet(); > final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); > container.setConnectionFactory(pcf); > container.setTransactionManager(transactionManager); > container.setMessageListener(new MessageListener() { > public void onMessage(Message message) { > try { > ObjectMessage msg = (ObjectMessage) message; > BigInteger number = (BigInteger) msg.getObject(); > synchronized (notConsumedMessages) { > if (!notConsumedMessages.remove(number)) { > System.err.println("Message " + number + " not found in list!"); > } else { > // System.out.println("Consumed message " + number); > } > } > } catch (JMSException e) { > // e.printStackTrace(); > System.out.println("Error consuming message!"); > } > } > }); > container.setSessionTransacted(true); > container.setDestinationName("testqueue"); > container.setExceptionListener(new ExceptionListener() { > public void onException(JMSException jmsException) { > System.err.println(jmsException); > } > }); > container.afterPropertiesSet(); > container.initialize(); > TimeUnit.SECONDS.sleep(1); > } > public static void main(String[] args) throws Exception { > long start = System.currentTimeMillis(); > setupReceiver(); > send(); > int remainingSize = 0; > do { > Thread.sleep(500); > synchronized (notConsumedMessages) { > remainingSize = notConsumedMessages.size(); > } > System.out.println("Unconsumed " + remainingSize + ": " + sb); > } while (remainingSize > 0); > System.out.println("All messages consumed."); > long end = System.currentTimeMillis(); > System.out.println((end-start)); > System.exit(0); > } > } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.