activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r356524 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/activemq/advisory/ test/java/org/activemq/advisory/
Date Tue, 13 Dec 2005 16:21:44 GMT
Author: jstrachan
Date: Tue Dec 13 08:21:32 2005
New Revision: 356524

URL: http://svn.apache.org/viewcvs?rev=356524&view=rev
Log:
added a ConsumerListener so that you can listen to consumers coming and going easily (hiding
the details of the Advisories behind a simple Bean API) as well as be notified on exactly
how many consumers there are to be able to drive demand based publishing etc

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/package.html

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java?rev=356524&r1=356523&r2=356524&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
Tue Dec 13 08:21:32 2005
@@ -74,12 +74,12 @@
 
     public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Throwable
{
         next.addConsumer(context, info);
-        
+
         // Don't advise advisory topics.
         if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            fireAdvisory(context, topic, info);
             consumers.put(info.getConsumerId(), info);
+            fireConsumerAdvisory(context, topic, info);
         } else {
             
             // We need to replay all the previously collected state objects 
@@ -118,7 +118,7 @@
                 for (Iterator iter = consumers.values().iterator(); iter.hasNext();) {
                     ConsumerInfo value = (ConsumerInfo) iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
-                    fireAdvisory(context, topic, value, info.getConsumerId());
+                    fireConsumerAdvisory(context, topic, value);
                 }
             }
         }
@@ -168,8 +168,8 @@
         // Don't advise advisory topics.
         if( !AdvisorySupport.isAdvisoryTopic(info.getDestination()) ) { 
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
-            fireAdvisory(context, topic, info.createRemoveCommand());
             consumers.remove(info.getConsumerId());
+            fireConsumerAdvisory(context, topic, info.createRemoveCommand());
         }
     }
 
@@ -184,13 +184,22 @@
         }
     }
     
-    private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command)
throws Throwable {
+    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command)
throws Throwable {
         fireAdvisory(context, topic, command, null);
     }
     
-    private void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Throwable {
-        
+    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId) throws Throwable {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
+    }
+    
+    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command
command) throws Throwable {
+        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+        advisoryMessage.setIntProperty("consumerCount", consumers.size());
+        fireAdvisory(context, topic, command, null, advisoryMessage);
+    }
+
+    protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command,
ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Throwable {
         advisoryMessage.setDataStructure(command);
         advisoryMessage.setPersistent(false);
         advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
@@ -208,7 +217,6 @@
         } finally {
             context.setProducerFlowControl(originalFlowControl);
         }
-        
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,70 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+import org.activemq.command.ConsumerId;
+
+import javax.jms.Destination;
+
+import java.util.EventObject;
+
+/**
+ * An event when the number of consumers on a given destination changes.
+ * 
+ * @version $Revision$
+ */
+public abstract class ConsumerEvent extends EventObject {
+    private static final long serialVersionUID = 2442156576867593780L;
+    private final Destination destination;
+    private final ConsumerId consumerId;
+    private final int consumerCount;
+
+    public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId
consumerId, int consumerCount) {
+        super(source);
+        this.destination = destination;
+        this.consumerId = consumerId;
+        this.consumerCount = consumerCount;
+    }
+
+    public ConsumerEventSource getAdvisor() {
+        return (ConsumerEventSource) getSource();
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    /**
+     * Returns the current number of consumers active at the time this advisory was sent.
+     * 
+     * Note that this is not the number of consumers active when the consumer started consuming.
+     * It is usually more vital to know how many consumers there are now - rather than historically
+     * how many there were when a consumer started. So if you create a {@link ConsumerListener}
+     * after many consumers have started, you will receive a ConsumerEvent for each consumer.
However the
+     * {@link #getConsumerCount()} method will always return the current active consumer
count on each event.
+     */
+    public int getConsumerCount() {
+        return consumerCount;
+    }
+
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public abstract boolean isStarted();
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,137 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+import org.activemq.Service;
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQMessage;
+import org.activemq.command.ActiveMQTopic;
+import org.activemq.command.ConsumerId;
+import org.activemq.command.ConsumerInfo;
+import org.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ * 
+ * @version $Revision$
+ */
+public class ConsumerEventSource implements Service, MessageListener {
+    private static final Log log = LogFactory.getLog(ConsumerEventSource.class);
+
+    private final Connection connection;
+    private final ActiveMQDestination destination;
+    private ConsumerListener listener;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicInteger consumerCount = new AtomicInteger();
+    private Session session;
+    private MessageConsumer consumer;
+
+    public ConsumerEventSource(Connection connection, Destination destination) throws JMSException
{
+        this.connection = connection;
+        this.destination = ActiveMQDestination.transform(destination);
+    }
+
+    public void setConsumerListener(ConsumerListener listener) {
+        this.listener = listener;
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination);
+            consumer = session.createConsumer(advisoryTopic);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+            Object command = activeMessage.getDataStructure();
+            int count = 0;
+            if (command instanceof ConsumerInfo) {
+                count = consumerCount.incrementAndGet();
+                count = extractConsumerCountFromMessage(message, count);
+                fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)
command, count));
+            }
+            else if (command instanceof RemoveInfo) {
+                RemoveInfo removeInfo = (RemoveInfo) command;
+                if (removeInfo.isConsumerRemove()) {
+                    count = consumerCount.decrementAndGet();
+                    count = extractConsumerCountFromMessage(message, count);
+                    fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)
removeInfo.getObjectId(), count));
+                }
+            }
+            else {
+                log.warn("Unknown command: " + command);
+            }
+        }
+        else {
+            log.warn("Unknown message type: " + message + ". Message ignored");
+        }
+    }
+
+    /**
+     * Lets rely by default on the broker telling us what the consumer count is
+     * as it can ensure that we are up to date at all times and have not
+     * received messages out of order etc.
+     */
+    protected int extractConsumerCountFromMessage(Message message, int count) {
+        try {
+            Object value = message.getObjectProperty("consumerCount");
+            if (value instanceof Number) {
+                Number n = (Number) value;
+                return n.intValue();
+            }
+            log.warn("No consumerCount header available on the message: " + message);
+        }
+        catch (Exception e) {
+            log.warn("Failed to extract consumerCount from message: " + message + ".Reason:
" + e, e);
+        }
+        return count;
+    }
+
+    protected void fireConsumerEvent(ConsumerEvent event) {
+        if (listener != null) {
+            listener.onConsumerEvent(event);
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerEventSource.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,28 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+/**
+ * Listen to the changes in the number of active consumers available for a given destination.
+ * 
+ * @version $Revision$
+ */
+public interface ConsumerListener {
+
+    public void onConsumerEvent(ConsumerEvent event);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,49 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ConsumerInfo;
+
+/**
+ * An event when a new consumer has started.
+ * 
+ * @version $Revision$
+ */
+public class ConsumerStartedEvent extends ConsumerEvent {
+
+    private static final long serialVersionUID = 5088138839609391074L;
+
+    private final ConsumerInfo consumerInfo;
+
+    public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination,
ConsumerInfo consumerInfo, int count) {
+        super(source, destination, consumerInfo.getConsumerId(), count);
+        this.consumerInfo = consumerInfo;
+    }
+
+    public boolean isStarted() {
+        return true;
+    }
+
+    /**
+     * @return details of the subscription
+     */
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStartedEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,40 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ConsumerId;
+
+/**
+ * An event generated when a consumer stops.
+ * 
+ * @version $Revision$
+ */
+public class ConsumerStoppedEvent extends ConsumerEvent {
+
+    private static final long serialVersionUID = 5378835541037193206L;
+
+    public ConsumerStoppedEvent(ConsumerEventSource source, ActiveMQDestination destination,
ConsumerId consumerId, int count) {
+        super(source, destination, consumerId, count);
+    }
+
+    public boolean isStarted() {
+        return false;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/ConsumerStoppedEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/package.html
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/package.html?rev=356524&r1=356523&r2=356524&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/package.html
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/package.html
Tue Dec 13 08:21:32 2005
@@ -3,7 +3,7 @@
 </head>
 <body>
 
-Support for JMS Advisory messages
+Support for JMS Advisory messages as well as some helper listeners to listen to the clients,
producers and consumers available.
 
 </body>
 </html>

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java?rev=356524&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
Tue Dec 13 08:21:32 2005
@@ -0,0 +1,136 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.advisory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+import org.activemq.EmbeddedBrokerTestSupport;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener
{
+
+    protected Session consumerSession1;
+    protected Session consumerSession2;
+    protected int consumerCounter;
+    protected ConsumerEventSource consumerEventSource;
+    protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
+    private Connection connection;
+
+    public void testConsumerEvents() throws Exception {
+        consumerEventSource.start();
+
+        consumerSession1 = createConsumer();
+        assertConsumerEvent(1, true);
+
+        consumerSession2 = createConsumer();
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void testListenWhileAlreadyConsumersActive() throws Exception {
+        consumerSession1 = createConsumer();
+        consumerSession2 = createConsumer();
+
+        consumerEventSource.start();
+        assertConsumerEvent(2, true);
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void onConsumerEvent(ConsumerEvent event) {
+        eventQueue.add(event);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        consumerEventSource = new ConsumerEventSource(connection, destination);
+        consumerEventSource.setConsumerListener(this);
+    }
+
+    protected void tearDown() throws Exception {
+        if (consumerEventSource != null) {
+            consumerEventSource.stop();
+        }
+        if (consumerSession2 != null) {
+            consumerSession2.close();
+        }
+        if (consumerSession1 != null) {
+            consumerSession1.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException
{
+        ConsumerEvent event = waitForConsumerEvent();
+        assertEquals("Consumer count", count, event.getConsumerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected Session createConsumer() throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        System.out.println("Creating consumer: " + consumerText + " on destination: " + destination);
+        
+        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = answer.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                System.out.println("Received message by: " + consumerText + " message: "
+ message);
+            }
+        });
+        return answer;
+    }
+
+    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
+        ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/advisory/ConsumerListenerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message