activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r380795 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ test/java/org/apache/activemq/advisory/
Date Fri, 24 Feb 2006 19:30:51 GMT
Author: rajdavies
Date: Fri Feb 24 11:30:49 2006
New Revision: 380795

URL: http://svn.apache.org/viewcvs?rev=380795&view=rev
Log:
Added ProducerEvent advisory support

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=380795&r1=380794&r2=380795&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Fri Feb 24 11:30:49 2006
@@ -107,7 +107,7 @@
                 for (Iterator iter = producers.values().iterator(); iter.hasNext();) {
                     ProducerInfo value = (ProducerInfo) iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
-                    fireAdvisory(context, topic, value, info.getConsumerId());
+                    fireProducerAdvisory(context, topic, value, info.getConsumerId());
                 }
             }
             
@@ -177,8 +177,8 @@
         // Don't advise advisory topics.
         if( info.getDestination()!=null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())
) { 
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
-            fireAdvisory(context, topic, info.createRemoveCommand());
             producers.remove(info.getProducerId());
+            fireProducerAdvisory(context, topic, info.createRemoveCommand());
         }
     }
     
@@ -197,6 +197,15 @@
     protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command
command, ConsumerId targetConsumerId) throws Throwable {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
         advisoryMessage.setIntProperty("consumerCount", consumers.size());
+        fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
+    }
+    
+    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command
command) throws Throwable {
+        fireProducerAdvisory(context, topic, command, null);
+    }
+    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command
command, ConsumerId targetConsumerId) throws Throwable {
+        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+        advisoryMessage.setIntProperty("producerCount", producers.size());
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEvent.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,64 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ProducerId;
+
+import javax.jms.Destination;
+
+import java.util.EventObject;
+
+/**
+ * An event when the number of producers on a given destination changes.
+ * 
+ * @version $Revision: 359679 $
+ */
+public abstract class ProducerEvent extends EventObject {
+    private static final long serialVersionUID = 2442156576867593780L;
+    private final Destination destination;
+    private final ProducerId producerId;
+    private final int producerCount;
+
+    public ProducerEvent(ProducerEventSource source, Destination destination, ProducerId
producerId, int producerCount) {
+        super(source);
+        this.destination = destination;
+        this.producerId = producerId;
+        this.producerCount = producerCount;
+    }
+
+    public ProducerEventSource getAdvisor() {
+        return (ProducerEventSource) getSource();
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    /**
+     * Returns the current number of producers active at the time this advisory was sent.
+     * 
+     */
+    public int getProducerCount() {
+        return producerCount;
+    }
+
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public abstract boolean isStarted();
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,129 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.activemq.Service;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An object which can be used to listen to the number of active consumers
+ * available on a given destination.
+ * 
+ * @version $Revision: 359679 $
+ */
+public class ProducerEventSource implements Service, MessageListener {
+    private static final Log log = LogFactory.getLog(ProducerEventSource.class);
+
+    private final Connection connection;
+    private final ActiveMQDestination destination;
+    private ProducerListener listener;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicInteger producerCount = new AtomicInteger();
+    private Session session;
+    private MessageConsumer consumer;
+
+    public ProducerEventSource(Connection connection, Destination destination) throws JMSException
{
+        this.connection = connection;
+        this.destination = ActiveMQDestination.transform(destination);
+    }
+
+    public void setProducerListener(ProducerListener listener) {
+        this.listener = listener;
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination);
+            consumer = session.createConsumer(advisoryTopic);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
+
+    public void onMessage(Message message) {
+        if (message instanceof ActiveMQMessage) {
+            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
+            Object command = activeMessage.getDataStructure();
+            int count = 0;
+            if (command instanceof ProducerInfo) {
+                count = producerCount.incrementAndGet();
+                count = extractProducerCountFromMessage(message, count);
+                fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)
command, count));
+            }
+            else if (command instanceof RemoveInfo) {
+                RemoveInfo removeInfo = (RemoveInfo) command;
+                if (removeInfo.isProducerRemove()) {
+                    count = producerCount.decrementAndGet();
+                    count = extractProducerCountFromMessage(message, count);
+                    fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)
removeInfo.getObjectId(), count));
+                }
+            }
+            else {
+                log.warn("Unknown command: " + command);
+            }
+        }
+        else {
+            log.warn("Unknown message type: " + message + ". Message ignored");
+        }
+    }
+
+    protected int extractProducerCountFromMessage(Message message, int count) {
+        try {
+            Object value = message.getObjectProperty("producerCount");
+            if (value instanceof Number) {
+                Number n = (Number) value;
+                return n.intValue();
+            }
+            log.warn("No producerCount header available on the message: " + message);
+        }
+        catch (Exception e) {
+            log.warn("Failed to extract producerCount from message: " + message + ".Reason:
" + e, e);
+        }
+        return count;
+    }
+
+    protected void fireProducerEvent(ProducerEvent event) {
+        if (listener != null) {
+            listener.onProducerEvent(event);
+        }
+    }
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerListener.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,27 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+/**
+ * Listen to the changes in the number of active consumers available for a given destination.
+ * 
+ * @version $Revision: 359679 $
+ */
+public interface ProducerListener {
+
+    public void onProducerEvent(ProducerEvent event);
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStartedEvent.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerInfo;
+
+/**
+ * An event when a new consumer has started.
+ * 
+ * @version $Revision: 359679 $
+ */
+public class ProducerStartedEvent extends ProducerEvent {
+
+    private static final long serialVersionUID = 5088138839609391074L;
+
+    private final ProducerInfo consumerInfo;
+
+    public ProducerStartedEvent(ProducerEventSource source, ActiveMQDestination destination,
ProducerInfo consumerInfo, int count) {
+        super(source, destination, consumerInfo.getProducerId(), count);
+        this.consumerInfo = consumerInfo;
+    }
+
+    public boolean isStarted() {
+        return true;
+    }
+
+    /**
+     * @return details of the subscription
+     */
+    public ProducerInfo getProducerInfo() {
+        return consumerInfo;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/ProducerStoppedEvent.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerId;
+
+/**
+ * An event generated when a consumer stops.
+ * 
+ * @version $Revision: 359679 $
+ */
+public class ProducerStoppedEvent extends ProducerEvent {
+
+    private static final long serialVersionUID = 5378835541037193206L;
+
+    public ProducerStoppedEvent(ProducerEventSource source, ActiveMQDestination destination,
ProducerId consumerId, int count) {
+        super(source, destination, consumerId, count);
+    }
+
+    public boolean isStarted() {
+        return false;
+    }
+
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java?rev=380795&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/ProducerListenerTest.java
Fri Feb 24 11:30:49 2006
@@ -0,0 +1,134 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.advisory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * 
+ * @version $Revision: 359679 $
+ */
+public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener
{
+
+    protected Session consumerSession1;
+    protected Session consumerSession2;
+    protected int consumerCounter;
+    protected ProducerEventSource producerEventSource;
+    protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
+    private Connection connection;
+
+    public void testProducerEvents() throws Exception {
+        producerEventSource.start();
+
+        consumerSession1 = createProducer();
+        assertConsumerEvent(1, true);
+
+        consumerSession2 = createProducer();
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void testListenWhileAlreadyConsumersActive() throws Exception {
+        consumerSession1 = createProducer();
+        consumerSession2 = createProducer();
+
+        producerEventSource.start();
+        assertConsumerEvent(2, true);
+        assertConsumerEvent(2, true);
+
+        consumerSession1.close();
+        consumerSession1 = null;
+        assertConsumerEvent(1, false);
+
+        consumerSession2.close();
+        consumerSession2 = null;
+        assertConsumerEvent(0, false);
+    }
+
+    public void onProducerEvent(ProducerEvent event) {
+        eventQueue.add(event);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connection = createConnection();
+        connection.start();
+        producerEventSource = new ProducerEventSource(connection, destination);
+        producerEventSource.setProducerListener(this);
+    }
+
+    protected void tearDown() throws Exception {
+        if (producerEventSource != null) {
+            producerEventSource.stop();
+        }
+        if (consumerSession2 != null) {
+            consumerSession2.close();
+        }
+        if (consumerSession1 != null) {
+            consumerSession1.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException
{
+        ProducerEvent event = waitForProducerEvent();
+        assertEquals("Producer count", count, event.getProducerCount());
+        assertEquals("started", started, event.isStarted());
+    }
+
+    protected Session createProducer() throws JMSException {
+        final String consumerText = "Consumer: " + (++consumerCounter);
+        System.out.println("Creating consumer: " + consumerText + " on destination: " + destination);
+        
+        Session answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = answer.createProducer(destination);
+        return answer;
+    }
+
+    protected ProducerEvent waitForProducerEvent() throws InterruptedException {
+        ProducerEvent answer = (ProducerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS);
+        assertTrue("Should have received a consumer event!", answer != null);
+        return answer;
+    }
+
+}



Mime
View raw message