activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: svn commit: r883411 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Date Mon, 23 Nov 2009 18:49:53 GMT
committed a fix for the protected access compile error, but the test fails
for me.

2009/11/23 Gary Tully <gary.tully@gmail.com>

> this breaks the build, getMBeanServer() is intentionally protected in
> ManagementContext.
>
> >
> activemq_t/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java:[364,33]
> getMBeanServer() has protected access in
> org.apache.activemq.broker.jmx.ManagementContext
>
> 2009/11/23 <bsnyder@apache.org>
>
> Author: bsnyder
>> Date: Mon Nov 23 17:02:39 2009
>> New Revision: 883411
>>
>> URL: http://svn.apache.org/viewvc?rev=883411&view=rev
>> Log:
>> Updated test for AMQ-2324 and AMQ-2484
>>
>> Modified:
>>
>>  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
>>
>> Modified:
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff
>>
>> ==============================================================================
>> ---
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
>> Mon Nov 23 17:02:39 2009
>> @@ -2,35 +2,102 @@
>>
>>  import java.io.File;
>>  import java.io.IOException;
>> +import java.net.URI;
>> +import java.net.URISyntaxException;
>> +import java.util.ArrayList;
>> +import java.util.Enumeration;
>> +import java.util.HashMap;
>> +import java.util.Hashtable;
>> +import java.util.List;
>> +import java.util.Map;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import javax.jms.Connection;
>>  import javax.jms.DeliveryMode;
>> -
>> -import junit.framework.Test;
>> -
>> +import javax.jms.MessageNotWriteableException;
>> +import javax.jms.Queue;
>> +import javax.jms.QueueBrowser;
>> +import javax.jms.Session;
>> +import javax.management.MBeanServer;
>> +import javax.management.ObjectName;
>> +
>> +import junit.framework.TestCase;
>> +
>> +import org.apache.activemq.ActiveMQConnectionFactory;
>> +import org.apache.activemq.broker.BrokerService;
>> +import org.apache.activemq.broker.BrokerTestSupport;
>>  import org.apache.activemq.broker.StubConnection;
>> +import org.apache.activemq.broker.TransportConnector;
>> +import org.apache.activemq.broker.jmx.ManagementContext;
>>  import org.apache.activemq.command.ActiveMQDestination;
>> -import org.apache.activemq.command.Command;
>> +import org.apache.activemq.command.ActiveMQTextMessage;
>> +import org.apache.activemq.command.ConnectionId;
>>  import org.apache.activemq.command.ConnectionInfo;
>>  import org.apache.activemq.command.ConsumerInfo;
>> +import org.apache.activemq.command.DestinationInfo;
>>  import org.apache.activemq.command.Message;
>>  import org.apache.activemq.command.MessageAck;
>> +import org.apache.activemq.command.MessageDispatch;
>> +import org.apache.activemq.command.MessageId;
>>  import org.apache.activemq.command.ProducerInfo;
>>  import org.apache.activemq.command.SessionInfo;
>>  import org.apache.activemq.transport.Transport;
>> -import org.apache.activemq.transport.TransportListener;
>> +import org.apache.activemq.transport.TransportFactory;
>>  import org.apache.activemq.util.Wait;
>>  import org.apache.commons.io.FileUtils;
>>  import org.apache.commons.logging.Log;
>>  import org.apache.commons.logging.LogFactory;
>>
>> -public class BrokerNetworkWithStuckMessagesTest extends
>> NetworkTestSupport {
>> -
>> +/**
>> + * This class duplicates most of the functionality in {@link
>> NetworkTestSupport}
>> + * and {@link BrokerTestSupport} because more control was needed over how
>> brokers
>> + * and connectors are created. Also, this test asserts message counts via
>> JMX on
>> + * each broker.
>> + *
>> + * @author bsnyder
>> + *
>> + */
>> +public class BrokerNetworkWithStuckMessagesTest extends TestCase
>> /*NetworkTestSupport*/ {
>> +
>>     private static final Log LOG =
>> LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class);
>> -
>> -       private DemandForwardingBridge bridge;
>> -
>> -       protected void setUp() throws Exception {
>> -        super.setUp();
>> +
>> +    private BrokerService localBroker;
>> +    private BrokerService remoteBroker;
>> +    private DemandForwardingBridge bridge;
>> +
>> +    protected Map<String, BrokerService> brokers = new HashMap<String,
>> BrokerService>();
>> +    protected ArrayList connections = new ArrayList();
>> +
>> +    protected TransportConnector connector;
>> +    protected TransportConnector remoteConnector;
>> +
>> +    protected long idGenerator;
>> +    protected int msgIdGenerator;
>> +    protected int tempDestGenerator;
>> +    protected int maxWait = 4000;
>> +    protected String queueName = "TEST";
>> +
>> +    protected String amqDomain = "org.apache.activemq";
>> +
>> +    protected void setUp() throws Exception {
>> +
>> +        // For those who want visual confirmation:
>> +        //   Uncomment the following to enable JMX support on a port
>> number to use
>> +        //   Jconsole to view each broker. You will need to add some
>> calls to
>> +        //   Thread.sleep() to be able to actually slow things down so
>> that you
>> +        //   can manually see JMX attrs.
>> +//        System.setProperty("com.sun.management.jmxremote", "");
>> +//        System.setProperty("com.sun.management.jmxremote.port",
>> "1099");
>> +//        System.setProperty("com.sun.management.jmxremote.authenticate",
>> "false");
>> +//        System.setProperty("com.sun.management.jmxremote.ssl",
>> "false");
>> +
>> +        // Create the local broker
>> +        createBroker();
>> +        // Create the remote broker
>> +        createRemoteBroker();
>> +
>> +        // Remove the activemq-data directory from the creation of the
>> remote broker
>> +        FileUtils.deleteDirectory(new File("activemq-data"));
>>
>>         // Create a network bridge between the local and remote brokers so
>> that
>>         // demand-based forwarding can take place
>> @@ -39,79 +106,42 @@
>>         config.setDispatchAsync(false);
>>
>>         Transport localTransport = createTransport();
>> -        localTransport.setTransportListener(new TransportListener() {
>> -               Command command = null;
>> -                       public void onCommand(Object o) {
>> -                               this.command = (Command) o;
>> -                               LOG.info("Command from [" +
>> command.getFrom() + "] to [" + command.getTo() + "]");
>> -                       }
>> -
>> -                       public void onException(IOException error) {
>> -                               LOG.info("Command from [" +
>> command.getFrom() + "] to [" + command.getTo() + "]");
>> -                               LOG.info("Exception: " + error);
>> -                       }
>> -
>> -                       public void transportInterupted() {
>> -                               LOG.info("Interruption on local
>> transport");
>> -                       }
>> -
>> -                       public void transportResumed() {
>> -                               LOG.info("Resumption on local transport");
>> -                       }
>> -        });
>> -
>>         Transport remoteTransport = createRemoteTransport();
>> -        remoteTransport.setTransportListener(new TransportListener() {
>> -               Command command = null;
>> -                       public void onCommand(Object o) {
>> -                               this.command = (Command) o;
>> -                               LOG.info("Command from [" +
>> command.getFrom() + "] to [" + command.getTo() + "]");
>> -                       }
>> -
>> -                       public void onException(IOException error) {
>> -                               LOG.info("Command from [" +
>> command.getFrom() + "] to [" + command.getTo() + "]");
>> -                               LOG.info("Exception: " + error);
>> -                       }
>> -
>> -                       public void transportInterupted() {
>> -                               LOG.info("Interruption on remote
>> transport");
>> -                       }
>> -
>> -                       public void transportResumed() {
>> -                               LOG.info("Resumption on remote
>> transport");
>> -                       }
>> -        });
>>
>> +        // Create a network bridge between the two brokers
>>         bridge = new DemandForwardingBridge(config, localTransport,
>> remoteTransport);
>> -        bridge.setBrokerService(broker);
>> +        bridge.setBrokerService(localBroker);
>>         bridge.start();
>>
>> -        // Enable JMX support on the local and remote brokers
>> -//        broker.setUseJmx(true);
>> -//        remoteBroker.setUseJmx(true);
>> -
>> -        // Make sure persistence is disabled
>> -        broker.setPersistent(false);
>> -        broker.setPersistenceAdapter(null);
>> -        remoteBroker.setPersistent(false);
>> -        remoteBroker.setPersistenceAdapter(null);
>> +        waitForBridgeFormation();
>>
>> -        // Remove the activemq-data directory from the creation of the
>> remote broker
>> -        FileUtils.deleteDirectory(new File("activemq-data"));
>>     }
>> -
>> -       protected void tearDown() throws Exception {
>> +
>> +    protected void waitForBridgeFormation() throws Exception {
>> +        for (final BrokerService broker : brokers.values()) {
>> +            if (!broker.getNetworkConnectors().isEmpty()) {
>> +               // Max wait here is 30 secs
>> +                Wait.waitFor(new Wait.Condition() {
>> +                    public boolean isSatisified() throws Exception {
>> +                        return
>> !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
>> +                    }});
>> +            }
>> +        }
>> +    }
>> +
>> +    protected void tearDown() throws Exception {
>>         bridge.stop();
>> -        super.tearDown();
>> +        localBroker.stop();
>> +        remoteBroker.stop();
>>     }
>>
>> -       public void testBrokerNetworkWithStuckMessages() throws Exception
>> {
>> -
>> -               int sendNumMessages = 10;
>> -               int receiveNumMessages = 5;
>> -
>> -               // Create a producer and send a batch of 10 messages to
>> the local broker
>> -               StubConnection connection1 = createConnection();
>> +    public void testBrokerNetworkWithStuckMessages() throws Exception {
>> +
>> +        int sendNumMessages = 10;
>> +        int receiveNumMessages = 5;
>> +
>> +        // Create a producer
>> +        StubConnection connection1 = createConnection();
>>         ConnectionInfo connectionInfo1 = createConnectionInfo();
>>         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
>>         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
>> @@ -122,25 +152,25 @@
>>         // Create a destination on the local broker
>>         ActiveMQDestination destinationInfo1 = null;
>>
>> +        // Send a 10 messages to the local broker
>>         for (int i = 0; i < sendNumMessages; ++i) {
>> -               destinationInfo1 = createDestinationInfo(connection1,
>> connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
>> -//             connection1.send(createMessage(producerInfo,
>> destinationInfo1, DeliveryMode.NON_PERSISTENT));
>> -               connection1.request(createMessage(producerInfo,
>> destinationInfo1, DeliveryMode.NON_PERSISTENT));
>> +            destinationInfo1 = createDestinationInfo(connection1,
>> connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
>> +            connection1.request(createMessage(producerInfo,
>> destinationInfo1, DeliveryMode.NON_PERSISTENT));
>>         }
>>
>>         // Ensure that there are 10 messages on the local broker
>> -        int messageCount1 = countMessagesInQueue(connection1,
>> connectionInfo1, destinationInfo1);
>> -        assertEquals(10, messageCount1);
>> +        Object[] messages = browseQueueWithJmx(localBroker);
>> +        assertEquals(sendNumMessages, messages.length);
>>
>>
>> -        // Create a consumer on the remote broker
>> +        // Create a synchronous consumer on the remote broker
>>         final StubConnection connection2 = createRemoteConnection();
>>         ConnectionInfo connectionInfo2 = createConnectionInfo();
>>         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
>>         connection2.send(connectionInfo2);
>>         connection2.send(sessionInfo2);
>>         ActiveMQDestination destinationInfo2 =
>> -               createDestinationInfo(connection2, connectionInfo2,
>> ActiveMQDestination.QUEUE_TYPE);
>> +            createDestinationInfo(connection2, connectionInfo2,
>> ActiveMQDestination.QUEUE_TYPE);
>>         final ConsumerInfo consumerInfo2 =
>> createConsumerInfo(sessionInfo2, destinationInfo2);
>>         connection2.send(consumerInfo2);
>>
>> @@ -149,32 +179,27 @@
>>         // method, this will cause the messages on the local broker to be
>>         // forwarded to the remote broker.
>>         for (int i = 0; i < receiveNumMessages; ++i) {
>> -               assertTrue("Message " + i + " was not received",
>> Wait.waitFor(new Wait.Condition() {
>> -                public boolean isSatisified() throws Exception {
>> -                               Message message1 =
>> receiveMessage(connection2);
>> -                               assertNotNull(message1);
>> -                           connection2.send(createAck(consumerInfo2,
>> message1, 1, MessageAck.STANDARD_ACK_TYPE));
>> -                    return message1 != null;
>> -                }
>> -            }));
>> -//             Message message1 = receiveMessage(connection2);
>> -//             assertNotNull(message1);
>> -//          connection2.send(createAck(consumerInfo2, message1, 1,
>> MessageAck.STANDARD_ACK_TYPE));
>> +            Message message1 = receiveMessage(connection2);
>> +            assertNotNull(message1);
>> +            connection2.send(createAck(consumerInfo2, message1, 1,
>> MessageAck.STANDARD_ACK_TYPE));
>> +
>> +            Object[] msgs1 = browseQueueWithJmx(remoteBroker);
>> +            LOG.info("Found [" + msgs1.length + "] messages with JMX");
>> +//            assertEquals((sendNumMessages-i), msgs.length);
>>         }
>>
>> -        // Close the consumer on the remote broker
>> -        connection2.send(consumerInfo2.createRemoveCommand());
>> -
>>         // Ensure that there are zero messages on the local broker. This
>> tells
>>         // us that those messages have been prefetched to the remote
>> broker
>>         // where the demand exists.
>> -        int messageCount2 = countMessagesInQueue(connection1,
>> connectionInfo1, destinationInfo1);
>> -// Sometimes it fails here
>> -        assertEquals(0, messageCount2);
>> +        messages = browseQueueWithJmx(localBroker);
>> +        assertEquals(0, messages.length);
>> +
>> +        // Close the consumer on the remote broker
>> +        connection2.send(consumerInfo2.createRemoveCommand());
>>
>>         // There should now be 5 messages stuck on the remote broker
>> -        int messageCount3 = countMessagesInQueue(connection2,
>> connectionInfo2, destinationInfo2);
>> -        assertEquals(5, messageCount3);
>> +        messages = browseQueueWithJmx(remoteBroker);
>> +        assertEquals(5, messages.length);
>>
>>         // Create a consumer on the local broker just to confirm that it
>> doesn't
>>         // receive any messages
>> @@ -182,13 +207,13 @@
>>         connection1.send(consumerInfo1);
>>         Message message1 = receiveMessage(connection1);
>>
>> -               //////////////////////////////////////////////////////
>> +        //////////////////////////////////////////////////////
>>         // An assertNull() is done here because this is currently the
>> correct
>>         // behavior. This is actually the purpose of this test - to prove
>> that
>>         // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484
>> aim
>>         // to fix this situation so that messages don't get stuck.
>>         assertNull(message1);
>> -               //////////////////////////////////////////////////////
>> +        //////////////////////////////////////////////////////
>>
>>         ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2,
>> destinationInfo2);
>>         connection2.send(consumerInfo3);
>> @@ -197,30 +222,247 @@
>>         // to clean up the queue.
>>         int counter = 0;
>>         for (int i = 0; i < receiveNumMessages; ++i) {
>> -               message1 = receiveMessage(connection2);
>> -               assertNotNull(message1);
>> +            message1 = receiveMessage(connection2);
>> +            assertNotNull(message1);
>>             connection2.send(createAck(consumerInfo3, message1, 1,
>> MessageAck.STANDARD_ACK_TYPE));
>>             ++counter;
>>         }
>>         // Ensure that 5 messages were received
>>         assertEquals(receiveNumMessages, counter);
>>
>> -        Thread.sleep(2000);
>> +        // Let those acks percolate... This stinks but it's the only way
>> currently
>> +        // because these types of internal broker actions are
>> non-deterministic.
>> +        Thread.sleep(4000);
>>
>>         // Ensure that the queue on the remote broker is empty
>> -        int messageCount4 = countMessagesInQueue(connection2,
>> connectionInfo2, destinationInfo1);
>> -// Sometimes it fails here
>> -        assertEquals(0, messageCount4);
>> +        messages = browseQueueWithJmx(remoteBroker);
>> +        assertEquals(0, messages.length);
>>
>>         // Close the consumer on the remote broker
>>         connection2.send(consumerInfo3.createRemoveCommand());
>>
>>         connection1.stop();
>>         connection2.stop();
>> +    }
>> +
>> +    protected BrokerService createBroker() throws Exception {
>> +        localBroker = new BrokerService();
>> +        localBroker.setBrokerName("localhost");
>> +        localBroker.setUseJmx(true);
>> +        localBroker.setPersistenceAdapter(null);
>> +        localBroker.setPersistent(false);
>> +        connector = createConnector();
>> +        localBroker.addConnector(connector);
>> +        localBroker.start();
>> +        localBroker.waitUntilStarted();
>> +
>> +        localBroker.getManagementContext().setConnectorPort(2221);
>> +
>> +        brokers.put(localBroker.getBrokerName(), localBroker);
>> +
>> +        return localBroker;
>> +    }
>> +
>> +    protected BrokerService createRemoteBroker() throws Exception {
>> +        remoteBroker = new BrokerService();
>> +        remoteBroker.setBrokerName("remotehost");
>> +        remoteBroker.setUseJmx(true);
>> +        remoteBroker.setPersistenceAdapter(null);
>> +        remoteBroker.setPersistent(false);
>> +        remoteConnector = createRemoteConnector();
>> +        remoteBroker.addConnector(remoteConnector);
>> +        remoteBroker.waitUntilStarted();
>> +
>> +        remoteBroker.getManagementContext().setConnectorPort(2222);
>> +
>> +        brokers.put(remoteBroker.getBrokerName(), remoteBroker);
>> +
>> +        return remoteBroker;
>> +    }
>> +
>> +    protected Transport createTransport() throws Exception {
>> +        Transport transport =
>> TransportFactory.connect(connector.getServer().getConnectURI());
>> +        return transport;
>> +    }
>> +
>> +    protected Transport createRemoteTransport() throws Exception {
>> +        Transport transport =
>> TransportFactory.connect(remoteConnector.getServer().getConnectURI());
>> +        return transport;
>> +    }
>> +
>> +    protected TransportConnector createConnector() throws Exception,
>> IOException, URISyntaxException {
>> +        return new TransportConnector(TransportFactory.bind(new
>> URI(getLocalURI())));
>> +    }
>> +
>> +    protected TransportConnector createRemoteConnector() throws
>> Exception, IOException, URISyntaxException {
>> +        return new TransportConnector(TransportFactory.bind(new
>> URI(getRemoteURI())));
>> +    }
>> +
>> +    protected String getRemoteURI() {
>> +        return "vm://remotehost";
>> +    }
>> +
>> +    protected String getLocalURI() {
>> +        return "vm://localhost";
>> +    }
>> +
>> +    protected StubConnection createConnection() throws Exception {
>> +        Transport transport =
>> TransportFactory.connect(connector.getServer().getConnectURI());
>> +        StubConnection connection = new StubConnection(transport);
>> +        connections.add(connection);
>> +        return connection;
>> +    }
>> +
>> +    protected StubConnection createRemoteConnection() throws Exception {
>> +        Transport transport =
>> TransportFactory.connect(remoteConnector.getServer().getConnectURI());
>> +        StubConnection connection = new StubConnection(transport);
>> +        connections.add(connection);
>> +        return connection;
>> +    }
>> +
>> +    @SuppressWarnings("unchecked")
>> +    private Object[] browseQueueWithJms(BrokerService broker) throws
>> Exception {
>> +               Object[] messages = null;
>> +               Connection connection = null;
>> +               Session session = null;
>> +
>> +               try {
>> +                       URI brokerUri = connector.getUri();
>> +                       ActiveMQConnectionFactory connectionFactory = new
>> ActiveMQConnectionFactory(brokerUri.toString());
>> +                       connection = connectionFactory.createConnection();
>> +                       connection.start();
>> +                       session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> +                       Queue destination =
>> session.createQueue(queueName);
>> +                       QueueBrowser browser =
>> session.createBrowser(destination);
>> +                       List<Message> list = new ArrayList<Message>();
>> +                       for (Enumeration<Message> enumn =
>> browser.getEnumeration(); enumn.hasMoreElements();) {
>> +                               list.add(enumn.nextElement());
>> +                       }
>> +                       messages = list.toArray();
>> +               }
>> +               finally {
>> +                       if (session != null) {
>> +                               session.close();
>> +                       }
>> +                       if (connection != null) {
>> +                               connection.close();
>> +                       }
>> +               }
>> +               LOG.info("+Browsed with JMS: " + messages.length);
>> +
>> +               return messages;
>>        }
>> -
>> -    public static Test suite() {
>> -        return suite(BrokerNetworkWithStuckMessagesTest.class);
>> +
>> +    private Object[] browseQueueWithJmx(BrokerService broker) throws
>> Exception {
>> +        Hashtable<String, String> params = new Hashtable<String,
>> String>();
>> +        params.put("BrokerName", broker.getBrokerName());
>> +        params.put("Type", "Queue");
>> +        params.put("Destination", queueName);
>> +        ObjectName queueObjectName = ObjectName.getInstance(amqDomain,
>> params);
>> +
>> +        ManagementContext mgmtCtx = broker.getManagementContext();
>> +        MBeanServer mbs = mgmtCtx.getMBeanServer();
>> +        Object[] messages = (Object[]) mbs.invoke(queueObjectName,
>> "browse", new Object[0], new String[0]);
>> +
>> +               LOG.info("+Browsed with JMX: " + messages.length);
>> +
>> +        return messages;
>> +    }
>> +
>> +    protected ConnectionInfo createConnectionInfo() throws Exception {
>> +        ConnectionInfo info = new ConnectionInfo();
>> +        info.setConnectionId(new ConnectionId("connection:" +
>> (++idGenerator)));
>> +        info.setClientId(info.getConnectionId().getValue());
>> +        return info;
>> +    }
>> +
>> +    protected SessionInfo createSessionInfo(ConnectionInfo
>> connectionInfo) throws Exception {
>> +        SessionInfo info = new SessionInfo(connectionInfo,
>> ++idGenerator);
>> +        return info;
>> +    }
>> +
>> +    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo)
>> throws Exception {
>> +        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
>> +        return info;
>> +    }
>> +
>> +    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
>> ActiveMQDestination destination) throws Exception {
>> +        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
>> +        info.setBrowser(false);
>> +        info.setDestination(destination);
>> +        info.setPrefetchSize(1000);
>> +        info.setDispatchAsync(false);
>> +        return info;
>> +    }
>> +
>> +    protected DestinationInfo createTempDestinationInfo(ConnectionInfo
>> connectionInfo, byte destinationType) {
>> +        DestinationInfo info = new DestinationInfo();
>> +        info.setConnectionId(connectionInfo.getConnectionId());
>> +        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
>> +
>>  info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()
>> + ":" + (++tempDestGenerator), destinationType));
>> +        return info;
>> +    }
>> +
>> +    protected ActiveMQDestination createDestinationInfo(StubConnection
>> connection, ConnectionInfo connectionInfo1, byte destinationType) throws
>> Exception {
>> +        if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
>> +            DestinationInfo info =
>> createTempDestinationInfo(connectionInfo1, destinationType);
>> +            connection.send(info);
>> +            return info.getDestination();
>> +        } else {
>> +            return ActiveMQDestination.createDestination(queueName,
>> destinationType);
>> +        }
>> +    }
>> +
>> +    protected Message createMessage(ProducerInfo producerInfo,
>> ActiveMQDestination destination, int deliveryMode) {
>> +        Message message = createMessage(producerInfo, destination);
>> +        message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
>> +        return message;
>> +    }
>> +
>> +    protected Message createMessage(ProducerInfo producerInfo,
>> ActiveMQDestination destination) {
>> +        ActiveMQTextMessage message = new ActiveMQTextMessage();
>> +        message.setMessageId(new MessageId(producerInfo,
>> ++msgIdGenerator));
>> +        message.setDestination(destination);
>> +        message.setPersistent(false);
>> +        try {
>> +            message.setText("Test Message Payload.");
>> +        } catch (MessageNotWriteableException e) {
>> +        }
>> +        return message;
>> +    }
>> +
>> +    protected MessageAck createAck(ConsumerInfo consumerInfo, Message
>> msg, int count, byte ackType) {
>> +        MessageAck ack = new MessageAck();
>> +        ack.setAckType(ackType);
>> +        ack.setConsumerId(consumerInfo.getConsumerId());
>> +        ack.setDestination(msg.getDestination());
>> +        ack.setLastMessageId(msg.getMessageId());
>> +        ack.setMessageCount(count);
>> +        return ack;
>> +    }
>> +
>> +    public Message receiveMessage(StubConnection connection) throws
>> InterruptedException {
>> +        return receiveMessage(connection, maxWait);
>> +    }
>> +
>> +    public Message receiveMessage(StubConnection connection, long
>> timeout) throws InterruptedException {
>> +        while (true) {
>> +            Object o = connection.getDispatchQueue().poll(timeout,
>> TimeUnit.MILLISECONDS);
>> +
>> +            if (o == null) {
>> +                return null;
>> +            }
>> +            if (o instanceof MessageDispatch) {
>> +
>> +                MessageDispatch dispatch = (MessageDispatch)o;
>> +                if (dispatch.getMessage() == null) {
>> +                    return null;
>> +                }
>> +                dispatch.setMessage(dispatch.getMessage().copy());
>> +
>>  dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
>> +                return dispatch.getMessage();
>> +            }
>> +        }
>>     }
>>
>>  }
>>
>>
>>
>
>
> --
> http://blog.garytully.com
>
> Open Source Integration
> http://fusesource.com
>



-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message