activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Buckel (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-1585) Problems with pure master/slave configuration
Date Sun, 17 Feb 2008 23:27:15 GMT
Problems with pure master/slave configuration
---------------------------------------------

                 Key: AMQ-1585
                 URL: https://issues.apache.org/activemq/browse/AMQ-1585
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.0.0, 4.1.1, 5.1.0
         Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x
            Reporter: Thomas Buckel


As posted in the AMQ user forum:
http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769
-------------------
Hi all,

I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology.

Initially I have tried v4.1.1 which failed with an exception. I found an AMQ JIRA ticket which
said that Pure/Master slave didn't work in v4.1.1.
Ok, so I switched to AMQ 5.0.0, created 2 configs (master/slave, see end of message) and ran
two AMQ instances (on the same box) and most of the times my test (see below) worked, but
more often I get various error messages like:

- On the slave:

ERROR Service                        - Async error occurred: javax.jms.JMSException: Slave
broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1)
was not in the pending list
javax.jms.JMSException: Slave broker out of sync with master: Dispatched message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1)
was not in the pending list
        at org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160)
        at org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381)
        at org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550)
        at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
        at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
        at org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
        at org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211)
        at org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450)
        at org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)


- After having killed the master, stopped the slave, copied the slave's data into the master's
data directory various error message came up (as described in the Master/Slave recovery section),
e.g. (internal) ActiveMQ topics were not available, the admin webApp showed exceptions and
errors on the client.

The test I've created uses Spring 2.0.x and pumps 1000 MapMessages in a queue through Spring's
JmsTempate, each message is created within its own transaction, using JmsTransactionManager
and TransactionTemplate.
The created messages are consumed by an initially instantiated transactional DefaultMessageListenerContainer.
The AMQ JARs in the test's classpath are activemq-core-5.0.0.jar, geronimo-jms_1.1_spec-1.0.jar,
geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a really bad performance when only using
the activemq-all-5.0.0.jar (maybe this is the problem?).
The test code work's without problems with OpenMQ, but I'd prefer using the nice Pure Master/Active
ActiveMQ if I can get it running in a *stable* config ;)

I would highly appreciate any help or suggestions. Maybe my config is wrong or I miss something
essential. I've also tried a recent AMQ 5.1 SNAPSHOT which wasn't better...

See below for the small program i used to test (no unit test, behaviour appeared to be non
deterministic to me and it's not so nice as i've changed it quite often)

Thanks in advance,
Thomas

<!-- MASTER config -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.org/config/1.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  <broker xmlns="http://activemq.org/config/1.0" brokerName="master" dataDirectory="${activemq.base}/data">
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap> 
    </destinationPolicy>

    <transportConnectors>
       <transportConnector name="openwire" uri="tcp://tbuckel-desktop:7778" />
    </transportConnectors>

    <networkConnectors/>

    <managementContext>
       <managementContext connectorPort="1100" jmxDomainName="org.apache.activemq"/>
    </managementContext>

  </broker>

  <commandAgent xmlns="http://activemq.org/config/1.0"/>

  <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
    <connectors>
      <nioConnector port="8161" />
    </connectors>

    <handlers>
      <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin"
logUrlOnStart="true" />
    </handlers>
  </jetty>
</beans>


<!-- SLAVE config -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.org/config/1.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
  
  <broker xmlns="http://activemq.org/config/1.0" brokerName="slave" dataDirectory="${activemq.base}/data-slave"
          masterConnectorURI="tcp://tbuckel-desktop:7778">
  
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>

    <transportConnectors>
       <transportConnector name="openwire" uri="tcp://localhost:7779"/>
    </transportConnectors>

    <networkConnectors/>

    <managementContext>
       <managementContext connectorPort="1101" jmxDomainName="org.apache.activemq"/>
    </managementContext>

  </broker>

  <commandAgent xmlns="http://activemq.org/config/1.0"/>

  <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
    <connectors>
      <nioConnector port="8162" />
    </connectors>

    <handlers>
      <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin"
logUrlOnStart="true" />
    </handlers>
  </jetty>

</beans>

------------
Test code:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import javax.jms.*;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class AnotherFailoverTest {

    public static final int MESSAGES = 1000;

    private final static List<BigInteger> notConsumedMessages = new ArrayList<BigInteger>(MESSAGES);

    private static ConnectionFactory createCF() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
        cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false");
        return new TransactionAwareConnectionFactoryProxy(cf);
    }

    private static void send() throws Exception {
        JmsTransactionManager transactionManager = new JmsTransactionManager();
        transactionManager.setConnectionFactory(createCF());
        transactionManager.afterPropertiesSet();

        int i=0;
        do {
            i++;
            final int number = i;
            try {
                final BigInteger v = new BigInteger(Integer.toString(number));
                TransactionTemplate tt = new TransactionTemplate(transactionManager);
                tt.execute(new TransactionCallbackWithoutResult() {
                    protected void doInTransactionWithoutResult(TransactionStatus status)
{
                        final JmsTemplate template = new JmsTemplate(pcf);
                        template.setSessionTransacted(true);
                        template.afterPropertiesSet();
                        template.send("testqueue", new MessageCreator() {
                            public Message createMessage(Session session) throws JMSException
{
                                ObjectMessage dummyMessage = session.createObjectMessage();
                                dummyMessage.setObject(v);
                                synchronized (notConsumedMessages) {
                                    notConsumedMessages.add(v);
                                }
//                                System.out.println("Created message " + number + "(" + notConsumedMessages.size()
+ ")");
                                return dummyMessage;
                            }
                        });
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("Error creating message " + number);
            }
        } while (i < MESSAGES);
    }

    private static void setupReceiver() throws Exception {
        JmsTransactionManager transactionManager = new JmsTransactionManager();
        transactionManager.setConnectionFactory(createCF());
        transactionManager.afterPropertiesSet();

        final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(pcf);
        container.setTransactionManager(transactionManager);
        container.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                try {
                    ObjectMessage msg = (ObjectMessage) message;
                    BigInteger number = (BigInteger) msg.getObject();
                    synchronized (notConsumedMessages) {
                        if (!notConsumedMessages.remove(number)) {
                           System.err.println("Message " + number + " not found in list!");
                        } else {
   //                        System.out.println("Consumed message " + number);
                       }
                   }
                } catch (JMSException e) {
//                    e.printStackTrace();
                    System.out.println("Error consuming message!");
                }
            }
        });
        container.setSessionTransacted(true);
        container.setDestinationName("testqueue");
        container.setExceptionListener(new ExceptionListener() {
            public void onException(JMSException jmsException) {
                System.err.println(jmsException);
            }
        });
        container.afterPropertiesSet();
        container.initialize();
        TimeUnit.SECONDS.sleep(1);
    }

    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        setupReceiver();
        send();

        int remainingSize = 0;
        do {
            Thread.sleep(500);
            synchronized (notConsumedMessages) {
                remainingSize = notConsumedMessages.size();
            }
            System.out.println("Unconsumed " + remainingSize + ": " + sb);
        } while (remainingSize > 0);

        System.out.println("All messages consumed.");
        long end = System.currentTimeMillis();
        System.out.println((end-start));
        System.exit(0);
    }

}



-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message