activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: svn commit: r883411 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
Date Mon, 23 Nov 2009 18:21:24 GMT
this breaks the build, getMBeanServer() is intentionally protected in
ManagementContext.

>
activemq_t/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java:[364,33]
getMBeanServer() has protected access in
org.apache.activemq.broker.jmx.ManagementContext

2009/11/23 <bsnyder@apache.org>

> Author: bsnyder
> Date: Mon Nov 23 17:02:39 2009
> New Revision: 883411
>
> URL: http://svn.apache.org/viewvc?rev=883411&view=rev
> Log:
> Updated test for AMQ-2324 and AMQ-2484
>
> Modified:
>
>  activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
>
> Modified:
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java?rev=883411&r1=883410&r2=883411&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
> (original)
> +++
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/BrokerNetworkWithStuckMessagesTest.java
> Mon Nov 23 17:02:39 2009
> @@ -2,35 +2,102 @@
>
>  import java.io.File;
>  import java.io.IOException;
> +import java.net.URI;
> +import java.net.URISyntaxException;
> +import java.util.ArrayList;
> +import java.util.Enumeration;
> +import java.util.HashMap;
> +import java.util.Hashtable;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.concurrent.TimeUnit;
>
> +import javax.jms.Connection;
>  import javax.jms.DeliveryMode;
> -
> -import junit.framework.Test;
> -
> +import javax.jms.MessageNotWriteableException;
> +import javax.jms.Queue;
> +import javax.jms.QueueBrowser;
> +import javax.jms.Session;
> +import javax.management.MBeanServer;
> +import javax.management.ObjectName;
> +
> +import junit.framework.TestCase;
> +
> +import org.apache.activemq.ActiveMQConnectionFactory;
> +import org.apache.activemq.broker.BrokerService;
> +import org.apache.activemq.broker.BrokerTestSupport;
>  import org.apache.activemq.broker.StubConnection;
> +import org.apache.activemq.broker.TransportConnector;
> +import org.apache.activemq.broker.jmx.ManagementContext;
>  import org.apache.activemq.command.ActiveMQDestination;
> -import org.apache.activemq.command.Command;
> +import org.apache.activemq.command.ActiveMQTextMessage;
> +import org.apache.activemq.command.ConnectionId;
>  import org.apache.activemq.command.ConnectionInfo;
>  import org.apache.activemq.command.ConsumerInfo;
> +import org.apache.activemq.command.DestinationInfo;
>  import org.apache.activemq.command.Message;
>  import org.apache.activemq.command.MessageAck;
> +import org.apache.activemq.command.MessageDispatch;
> +import org.apache.activemq.command.MessageId;
>  import org.apache.activemq.command.ProducerInfo;
>  import org.apache.activemq.command.SessionInfo;
>  import org.apache.activemq.transport.Transport;
> -import org.apache.activemq.transport.TransportListener;
> +import org.apache.activemq.transport.TransportFactory;
>  import org.apache.activemq.util.Wait;
>  import org.apache.commons.io.FileUtils;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>
> -public class BrokerNetworkWithStuckMessagesTest extends NetworkTestSupport
> {
> -
> +/**
> + * This class duplicates most of the functionality in {@link
> NetworkTestSupport}
> + * and {@link BrokerTestSupport} because more control was needed over how
> brokers
> + * and connectors are created. Also, this test asserts message counts via
> JMX on
> + * each broker.
> + *
> + * @author bsnyder
> + *
> + */
> +public class BrokerNetworkWithStuckMessagesTest extends TestCase
> /*NetworkTestSupport*/ {
> +
>     private static final Log LOG =
> LogFactory.getLog(BrokerNetworkWithStuckMessagesTest.class);
> -
> -       private DemandForwardingBridge bridge;
> -
> -       protected void setUp() throws Exception {
> -        super.setUp();
> +
> +    private BrokerService localBroker;
> +    private BrokerService remoteBroker;
> +    private DemandForwardingBridge bridge;
> +
> +    protected Map<String, BrokerService> brokers = new HashMap<String,
> BrokerService>();
> +    protected ArrayList connections = new ArrayList();
> +
> +    protected TransportConnector connector;
> +    protected TransportConnector remoteConnector;
> +
> +    protected long idGenerator;
> +    protected int msgIdGenerator;
> +    protected int tempDestGenerator;
> +    protected int maxWait = 4000;
> +    protected String queueName = "TEST";
> +
> +    protected String amqDomain = "org.apache.activemq";
> +
> +    protected void setUp() throws Exception {
> +
> +        // For those who want visual confirmation:
> +        //   Uncomment the following to enable JMX support on a port
> number to use
> +        //   Jconsole to view each broker. You will need to add some calls
> to
> +        //   Thread.sleep() to be able to actually slow things down so
> that you
> +        //   can manually see JMX attrs.
> +//        System.setProperty("com.sun.management.jmxremote", "");
> +//        System.setProperty("com.sun.management.jmxremote.port", "1099");
> +//        System.setProperty("com.sun.management.jmxremote.authenticate",
> "false");
> +//        System.setProperty("com.sun.management.jmxremote.ssl", "false");
> +
> +        // Create the local broker
> +        createBroker();
> +        // Create the remote broker
> +        createRemoteBroker();
> +
> +        // Remove the activemq-data directory from the creation of the
> remote broker
> +        FileUtils.deleteDirectory(new File("activemq-data"));
>
>         // Create a network bridge between the local and remote brokers so
> that
>         // demand-based forwarding can take place
> @@ -39,79 +106,42 @@
>         config.setDispatchAsync(false);
>
>         Transport localTransport = createTransport();
> -        localTransport.setTransportListener(new TransportListener() {
> -               Command command = null;
> -                       public void onCommand(Object o) {
> -                               this.command = (Command) o;
> -                               LOG.info("Command from [" +
> command.getFrom() + "] to [" + command.getTo() + "]");
> -                       }
> -
> -                       public void onException(IOException error) {
> -                               LOG.info("Command from [" +
> command.getFrom() + "] to [" + command.getTo() + "]");
> -                               LOG.info("Exception: " + error);
> -                       }
> -
> -                       public void transportInterupted() {
> -                               LOG.info("Interruption on local
> transport");
> -                       }
> -
> -                       public void transportResumed() {
> -                               LOG.info("Resumption on local transport");
> -                       }
> -        });
> -
>         Transport remoteTransport = createRemoteTransport();
> -        remoteTransport.setTransportListener(new TransportListener() {
> -               Command command = null;
> -                       public void onCommand(Object o) {
> -                               this.command = (Command) o;
> -                               LOG.info("Command from [" +
> command.getFrom() + "] to [" + command.getTo() + "]");
> -                       }
> -
> -                       public void onException(IOException error) {
> -                               LOG.info("Command from [" +
> command.getFrom() + "] to [" + command.getTo() + "]");
> -                               LOG.info("Exception: " + error);
> -                       }
> -
> -                       public void transportInterupted() {
> -                               LOG.info("Interruption on remote
> transport");
> -                       }
> -
> -                       public void transportResumed() {
> -                               LOG.info("Resumption on remote transport");
> -                       }
> -        });
>
> +        // Create a network bridge between the two brokers
>         bridge = new DemandForwardingBridge(config, localTransport,
> remoteTransport);
> -        bridge.setBrokerService(broker);
> +        bridge.setBrokerService(localBroker);
>         bridge.start();
>
> -        // Enable JMX support on the local and remote brokers
> -//        broker.setUseJmx(true);
> -//        remoteBroker.setUseJmx(true);
> -
> -        // Make sure persistence is disabled
> -        broker.setPersistent(false);
> -        broker.setPersistenceAdapter(null);
> -        remoteBroker.setPersistent(false);
> -        remoteBroker.setPersistenceAdapter(null);
> +        waitForBridgeFormation();
>
> -        // Remove the activemq-data directory from the creation of the
> remote broker
> -        FileUtils.deleteDirectory(new File("activemq-data"));
>     }
> -
> -       protected void tearDown() throws Exception {
> +
> +    protected void waitForBridgeFormation() throws Exception {
> +        for (final BrokerService broker : brokers.values()) {
> +            if (!broker.getNetworkConnectors().isEmpty()) {
> +               // Max wait here is 30 secs
> +                Wait.waitFor(new Wait.Condition() {
> +                    public boolean isSatisified() throws Exception {
> +                        return
> !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
> +                    }});
> +            }
> +        }
> +    }
> +
> +    protected void tearDown() throws Exception {
>         bridge.stop();
> -        super.tearDown();
> +        localBroker.stop();
> +        remoteBroker.stop();
>     }
>
> -       public void testBrokerNetworkWithStuckMessages() throws Exception {
> -
> -               int sendNumMessages = 10;
> -               int receiveNumMessages = 5;
> -
> -               // Create a producer and send a batch of 10 messages to the
> local broker
> -               StubConnection connection1 = createConnection();
> +    public void testBrokerNetworkWithStuckMessages() throws Exception {
> +
> +        int sendNumMessages = 10;
> +        int receiveNumMessages = 5;
> +
> +        // Create a producer
> +        StubConnection connection1 = createConnection();
>         ConnectionInfo connectionInfo1 = createConnectionInfo();
>         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
>         ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
> @@ -122,25 +152,25 @@
>         // Create a destination on the local broker
>         ActiveMQDestination destinationInfo1 = null;
>
> +        // Send a 10 messages to the local broker
>         for (int i = 0; i < sendNumMessages; ++i) {
> -               destinationInfo1 = createDestinationInfo(connection1,
> connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
> -//             connection1.send(createMessage(producerInfo,
> destinationInfo1, DeliveryMode.NON_PERSISTENT));
> -               connection1.request(createMessage(producerInfo,
> destinationInfo1, DeliveryMode.NON_PERSISTENT));
> +            destinationInfo1 = createDestinationInfo(connection1,
> connectionInfo1, ActiveMQDestination.QUEUE_TYPE);
> +            connection1.request(createMessage(producerInfo,
> destinationInfo1, DeliveryMode.NON_PERSISTENT));
>         }
>
>         // Ensure that there are 10 messages on the local broker
> -        int messageCount1 = countMessagesInQueue(connection1,
> connectionInfo1, destinationInfo1);
> -        assertEquals(10, messageCount1);
> +        Object[] messages = browseQueueWithJmx(localBroker);
> +        assertEquals(sendNumMessages, messages.length);
>
>
> -        // Create a consumer on the remote broker
> +        // Create a synchronous consumer on the remote broker
>         final StubConnection connection2 = createRemoteConnection();
>         ConnectionInfo connectionInfo2 = createConnectionInfo();
>         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
>         connection2.send(connectionInfo2);
>         connection2.send(sessionInfo2);
>         ActiveMQDestination destinationInfo2 =
> -               createDestinationInfo(connection2, connectionInfo2,
> ActiveMQDestination.QUEUE_TYPE);
> +            createDestinationInfo(connection2, connectionInfo2,
> ActiveMQDestination.QUEUE_TYPE);
>         final ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
> destinationInfo2);
>         connection2.send(consumerInfo2);
>
> @@ -149,32 +179,27 @@
>         // method, this will cause the messages on the local broker to be
>         // forwarded to the remote broker.
>         for (int i = 0; i < receiveNumMessages; ++i) {
> -               assertTrue("Message " + i + " was not received",
> Wait.waitFor(new Wait.Condition() {
> -                public boolean isSatisified() throws Exception {
> -                               Message message1 =
> receiveMessage(connection2);
> -                               assertNotNull(message1);
> -                           connection2.send(createAck(consumerInfo2,
> message1, 1, MessageAck.STANDARD_ACK_TYPE));
> -                    return message1 != null;
> -                }
> -            }));
> -//             Message message1 = receiveMessage(connection2);
> -//             assertNotNull(message1);
> -//          connection2.send(createAck(consumerInfo2, message1, 1,
> MessageAck.STANDARD_ACK_TYPE));
> +            Message message1 = receiveMessage(connection2);
> +            assertNotNull(message1);
> +            connection2.send(createAck(consumerInfo2, message1, 1,
> MessageAck.STANDARD_ACK_TYPE));
> +
> +            Object[] msgs1 = browseQueueWithJmx(remoteBroker);
> +            LOG.info("Found [" + msgs1.length + "] messages with JMX");
> +//            assertEquals((sendNumMessages-i), msgs.length);
>         }
>
> -        // Close the consumer on the remote broker
> -        connection2.send(consumerInfo2.createRemoveCommand());
> -
>         // Ensure that there are zero messages on the local broker. This
> tells
>         // us that those messages have been prefetched to the remote broker
>         // where the demand exists.
> -        int messageCount2 = countMessagesInQueue(connection1,
> connectionInfo1, destinationInfo1);
> -// Sometimes it fails here
> -        assertEquals(0, messageCount2);
> +        messages = browseQueueWithJmx(localBroker);
> +        assertEquals(0, messages.length);
> +
> +        // Close the consumer on the remote broker
> +        connection2.send(consumerInfo2.createRemoveCommand());
>
>         // There should now be 5 messages stuck on the remote broker
> -        int messageCount3 = countMessagesInQueue(connection2,
> connectionInfo2, destinationInfo2);
> -        assertEquals(5, messageCount3);
> +        messages = browseQueueWithJmx(remoteBroker);
> +        assertEquals(5, messages.length);
>
>         // Create a consumer on the local broker just to confirm that it
> doesn't
>         // receive any messages
> @@ -182,13 +207,13 @@
>         connection1.send(consumerInfo1);
>         Message message1 = receiveMessage(connection1);
>
> -               //////////////////////////////////////////////////////
> +        //////////////////////////////////////////////////////
>         // An assertNull() is done here because this is currently the
> correct
>         // behavior. This is actually the purpose of this test - to prove
> that
>         // messages are stuck on the remote broker. AMQ-2324 and AMQ-2484
> aim
>         // to fix this situation so that messages don't get stuck.
>         assertNull(message1);
> -               //////////////////////////////////////////////////////
> +        //////////////////////////////////////////////////////
>
>         ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo2,
> destinationInfo2);
>         connection2.send(consumerInfo3);
> @@ -197,30 +222,247 @@
>         // to clean up the queue.
>         int counter = 0;
>         for (int i = 0; i < receiveNumMessages; ++i) {
> -               message1 = receiveMessage(connection2);
> -               assertNotNull(message1);
> +            message1 = receiveMessage(connection2);
> +            assertNotNull(message1);
>             connection2.send(createAck(consumerInfo3, message1, 1,
> MessageAck.STANDARD_ACK_TYPE));
>             ++counter;
>         }
>         // Ensure that 5 messages were received
>         assertEquals(receiveNumMessages, counter);
>
> -        Thread.sleep(2000);
> +        // Let those acks percolate... This stinks but it's the only way
> currently
> +        // because these types of internal broker actions are
> non-deterministic.
> +        Thread.sleep(4000);
>
>         // Ensure that the queue on the remote broker is empty
> -        int messageCount4 = countMessagesInQueue(connection2,
> connectionInfo2, destinationInfo1);
> -// Sometimes it fails here
> -        assertEquals(0, messageCount4);
> +        messages = browseQueueWithJmx(remoteBroker);
> +        assertEquals(0, messages.length);
>
>         // Close the consumer on the remote broker
>         connection2.send(consumerInfo3.createRemoveCommand());
>
>         connection1.stop();
>         connection2.stop();
> +    }
> +
> +    protected BrokerService createBroker() throws Exception {
> +        localBroker = new BrokerService();
> +        localBroker.setBrokerName("localhost");
> +        localBroker.setUseJmx(true);
> +        localBroker.setPersistenceAdapter(null);
> +        localBroker.setPersistent(false);
> +        connector = createConnector();
> +        localBroker.addConnector(connector);
> +        localBroker.start();
> +        localBroker.waitUntilStarted();
> +
> +        localBroker.getManagementContext().setConnectorPort(2221);
> +
> +        brokers.put(localBroker.getBrokerName(), localBroker);
> +
> +        return localBroker;
> +    }
> +
> +    protected BrokerService createRemoteBroker() throws Exception {
> +        remoteBroker = new BrokerService();
> +        remoteBroker.setBrokerName("remotehost");
> +        remoteBroker.setUseJmx(true);
> +        remoteBroker.setPersistenceAdapter(null);
> +        remoteBroker.setPersistent(false);
> +        remoteConnector = createRemoteConnector();
> +        remoteBroker.addConnector(remoteConnector);
> +        remoteBroker.waitUntilStarted();
> +
> +        remoteBroker.getManagementContext().setConnectorPort(2222);
> +
> +        brokers.put(remoteBroker.getBrokerName(), remoteBroker);
> +
> +        return remoteBroker;
> +    }
> +
> +    protected Transport createTransport() throws Exception {
> +        Transport transport =
> TransportFactory.connect(connector.getServer().getConnectURI());
> +        return transport;
> +    }
> +
> +    protected Transport createRemoteTransport() throws Exception {
> +        Transport transport =
> TransportFactory.connect(remoteConnector.getServer().getConnectURI());
> +        return transport;
> +    }
> +
> +    protected TransportConnector createConnector() throws Exception,
> IOException, URISyntaxException {
> +        return new TransportConnector(TransportFactory.bind(new
> URI(getLocalURI())));
> +    }
> +
> +    protected TransportConnector createRemoteConnector() throws Exception,
> IOException, URISyntaxException {
> +        return new TransportConnector(TransportFactory.bind(new
> URI(getRemoteURI())));
> +    }
> +
> +    protected String getRemoteURI() {
> +        return "vm://remotehost";
> +    }
> +
> +    protected String getLocalURI() {
> +        return "vm://localhost";
> +    }
> +
> +    protected StubConnection createConnection() throws Exception {
> +        Transport transport =
> TransportFactory.connect(connector.getServer().getConnectURI());
> +        StubConnection connection = new StubConnection(transport);
> +        connections.add(connection);
> +        return connection;
> +    }
> +
> +    protected StubConnection createRemoteConnection() throws Exception {
> +        Transport transport =
> TransportFactory.connect(remoteConnector.getServer().getConnectURI());
> +        StubConnection connection = new StubConnection(transport);
> +        connections.add(connection);
> +        return connection;
> +    }
> +
> +    @SuppressWarnings("unchecked")
> +    private Object[] browseQueueWithJms(BrokerService broker) throws
> Exception {
> +               Object[] messages = null;
> +               Connection connection = null;
> +               Session session = null;
> +
> +               try {
> +                       URI brokerUri = connector.getUri();
> +                       ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(brokerUri.toString());
> +                       connection = connectionFactory.createConnection();
> +                       connection.start();
> +                       session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> +                       Queue destination = session.createQueue(queueName);
> +                       QueueBrowser browser =
> session.createBrowser(destination);
> +                       List<Message> list = new ArrayList<Message>();
> +                       for (Enumeration<Message> enumn =
> browser.getEnumeration(); enumn.hasMoreElements();) {
> +                               list.add(enumn.nextElement());
> +                       }
> +                       messages = list.toArray();
> +               }
> +               finally {
> +                       if (session != null) {
> +                               session.close();
> +                       }
> +                       if (connection != null) {
> +                               connection.close();
> +                       }
> +               }
> +               LOG.info("+Browsed with JMS: " + messages.length);
> +
> +               return messages;
>        }
> -
> -    public static Test suite() {
> -        return suite(BrokerNetworkWithStuckMessagesTest.class);
> +
> +    private Object[] browseQueueWithJmx(BrokerService broker) throws
> Exception {
> +        Hashtable<String, String> params = new Hashtable<String,
> String>();
> +        params.put("BrokerName", broker.getBrokerName());
> +        params.put("Type", "Queue");
> +        params.put("Destination", queueName);
> +        ObjectName queueObjectName = ObjectName.getInstance(amqDomain,
> params);
> +
> +        ManagementContext mgmtCtx = broker.getManagementContext();
> +        MBeanServer mbs = mgmtCtx.getMBeanServer();
> +        Object[] messages = (Object[]) mbs.invoke(queueObjectName,
> "browse", new Object[0], new String[0]);
> +
> +               LOG.info("+Browsed with JMX: " + messages.length);
> +
> +        return messages;
> +    }
> +
> +    protected ConnectionInfo createConnectionInfo() throws Exception {
> +        ConnectionInfo info = new ConnectionInfo();
> +        info.setConnectionId(new ConnectionId("connection:" +
> (++idGenerator)));
> +        info.setClientId(info.getConnectionId().getValue());
> +        return info;
> +    }
> +
> +    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo)
> throws Exception {
> +        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
> +        return info;
> +    }
> +
> +    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo)
> throws Exception {
> +        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
> +        return info;
> +    }
> +
> +    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
> ActiveMQDestination destination) throws Exception {
> +        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
> +        info.setBrowser(false);
> +        info.setDestination(destination);
> +        info.setPrefetchSize(1000);
> +        info.setDispatchAsync(false);
> +        return info;
> +    }
> +
> +    protected DestinationInfo createTempDestinationInfo(ConnectionInfo
> connectionInfo, byte destinationType) {
> +        DestinationInfo info = new DestinationInfo();
> +        info.setConnectionId(connectionInfo.getConnectionId());
> +        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
> +
>  info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()
> + ":" + (++tempDestGenerator), destinationType));
> +        return info;
> +    }
> +
> +    protected ActiveMQDestination createDestinationInfo(StubConnection
> connection, ConnectionInfo connectionInfo1, byte destinationType) throws
> Exception {
> +        if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
> +            DestinationInfo info =
> createTempDestinationInfo(connectionInfo1, destinationType);
> +            connection.send(info);
> +            return info.getDestination();
> +        } else {
> +            return ActiveMQDestination.createDestination(queueName,
> destinationType);
> +        }
> +    }
> +
> +    protected Message createMessage(ProducerInfo producerInfo,
> ActiveMQDestination destination, int deliveryMode) {
> +        Message message = createMessage(producerInfo, destination);
> +        message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
> +        return message;
> +    }
> +
> +    protected Message createMessage(ProducerInfo producerInfo,
> ActiveMQDestination destination) {
> +        ActiveMQTextMessage message = new ActiveMQTextMessage();
> +        message.setMessageId(new MessageId(producerInfo,
> ++msgIdGenerator));
> +        message.setDestination(destination);
> +        message.setPersistent(false);
> +        try {
> +            message.setText("Test Message Payload.");
> +        } catch (MessageNotWriteableException e) {
> +        }
> +        return message;
> +    }
> +
> +    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg,
> int count, byte ackType) {
> +        MessageAck ack = new MessageAck();
> +        ack.setAckType(ackType);
> +        ack.setConsumerId(consumerInfo.getConsumerId());
> +        ack.setDestination(msg.getDestination());
> +        ack.setLastMessageId(msg.getMessageId());
> +        ack.setMessageCount(count);
> +        return ack;
> +    }
> +
> +    public Message receiveMessage(StubConnection connection) throws
> InterruptedException {
> +        return receiveMessage(connection, maxWait);
> +    }
> +
> +    public Message receiveMessage(StubConnection connection, long timeout)
> throws InterruptedException {
> +        while (true) {
> +            Object o = connection.getDispatchQueue().poll(timeout,
> TimeUnit.MILLISECONDS);
> +
> +            if (o == null) {
> +                return null;
> +            }
> +            if (o instanceof MessageDispatch) {
> +
> +                MessageDispatch dispatch = (MessageDispatch)o;
> +                if (dispatch.getMessage() == null) {
> +                    return null;
> +                }
> +                dispatch.setMessage(dispatch.getMessage().copy());
> +
>  dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
> +                return dispatch.getMessage();
> +            }
> +        }
>     }
>
>  }
>
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message