Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 12792 invoked from network); 2 Jun 2009 21:30:23 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Jun 2009 21:30:23 -0000 Received: (qmail 96413 invoked by uid 500); 2 Jun 2009 21:30:35 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96385 invoked by uid 500); 2 Jun 2009 21:30:35 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96376 invoked by uid 99); 2 Jun 2009 21:30:35 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E8619238893B; Tue, 2 Jun 2009 21:29:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r781177 [9/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo... Date: Tue, 02 Jun 2009 21:29:35 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090602212940.E8619238893B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageAvailableListener.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.MessageConsumer; + +/** + * A listener which is notified if a message is available for processing via the + * receive methods. Typically on receiving this notification you can call + * {@link MessageConsumer#receiveNoWait()} to get the new message immediately. + * + * Note that this notification just indicates a message is available for synchronous consumption, + * it does not actually consume the message. + * + * @version $Revision: 1.1 $ + */ +public interface MessageAvailableListener { + + void onMessageAvailable(MessageConsumer consumer); + +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.activemq.command.MessageDispatch; + +public class MessageDispatchChannel { + + private final Object mutex = new Object(); + private final LinkedList list; + private boolean closed; + private boolean running; + + public MessageDispatchChannel() { + this.list = new LinkedList(); + } + + public void enqueue(MessageDispatch message) { + synchronized (mutex) { + list.addLast(message); + mutex.notify(); + } + } + + public void enqueueFirst(MessageDispatch message) { + synchronized (mutex) { + list.addFirst(message); + mutex.notify(); + } + } + + public boolean isEmpty() { + synchronized (mutex) { + return list.isEmpty(); + } + } + + /** + * Used to get an enqueued message. The amount of time this method blocks is + * based on the timeout value. - if timeout==-1 then it blocks until a + * message is received. - if timeout==0 then it it tries to not block at + * all, it returns a message if it is available - if timeout>0 then it + * blocks up to timeout amount of time. Expired messages will consumed by + * this method. + * + * @throws JMSException + * @return null if we timeout or if the consumer is closed. + * @throws InterruptedException + */ + public MessageDispatch dequeue(long timeout) throws InterruptedException { + synchronized (mutex) { + // Wait until the consumer is ready to deliver messages. + while (timeout != 0 && !closed && (list.isEmpty() || !running)) { + if (timeout == -1) { + mutex.wait(); + } else { + mutex.wait(timeout); + break; + } + } + if (closed || !running || list.isEmpty()) { + return null; + } + return list.removeFirst(); + } + } + + public MessageDispatch dequeueNoWait() { + synchronized (mutex) { + if (closed || !running || list.isEmpty()) { + return null; + } + return list.removeFirst(); + } + } + + public MessageDispatch peek() { + synchronized (mutex) { + if (closed || !running || list.isEmpty()) { + return null; + } + return list.getFirst(); + } + } + + public void start() { + synchronized (mutex) { + running = true; + mutex.notifyAll(); + } + } + + public void stop() { + synchronized (mutex) { + running = false; + mutex.notifyAll(); + } + } + + public void close() { + synchronized (mutex) { + if (!closed) { + running = false; + closed = true; + } + mutex.notifyAll(); + } + } + + public void clear() { + synchronized (mutex) { + list.clear(); + } + } + + public boolean isClosed() { + return closed; + } + + public int size() { + synchronized (mutex) { + return list.size(); + } + } + + public Object getMutex() { + return mutex; + } + + public boolean isRunning() { + return running; + } + + public List removeAll() { + synchronized (mutex) { + ArrayList rc = new ArrayList(list); + list.clear(); + return rc; + } + } + + public String toString() { + synchronized (mutex) { + return list.toString(); + } + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageDispatchChannel.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformer.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * A plugin strategy for transforming a message before it is sent by the JMS client or before it is + * dispatched to the JMS consumer + * + * @version $Revision: 564271 $ + */ +public interface MessageTransformer { + + /** + * Transforms the given message inside the producer before it is sent to the JMS bus. + */ + Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException; + + /** + * Transforms the given message inside the consumer before being dispatched to the client code + */ + Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException; +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/MessageTransformerSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * A useful base class for message transformers. + * + * @version $Revision: 563921 $ + */ +public abstract class MessageTransformerSupport implements MessageTransformer { + + /** + * Copies the standard JMS and user defined properties from the givem message to the specified message + * + * @param fromMessage the message to take the properties from + * @param toMesage the message to add the properties to + * @throws JMSException + */ + protected void copyProperties(Message fromMessage, Message toMesage) throws JMSException { + ActiveMQMessageTransformation.copyProperties(fromMessage, toMesage); + } +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.IllegalStateException; + +/** + * An exception thrown when an operation is invoked on a service + * which has not yet been started. + * + * @version $Revision: 1.2 $ + */ +public class NotStartedException extends IllegalStateException { + + private static final long serialVersionUID = -4907909323529887659L; + + public NotStartedException() { + super("IllegalState: This service has not yet been started", "AMQ-1003"); + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/NotStartedException.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/RedeliveryPolicy.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.Serializable; +import java.util.Random; + +/** + * Configuration options used to control how messages are re-delivered when they + * are rolled back. + * + * @org.apache.xbean.XBean element="redeliveryPolicy" + * @version $Revision: 1.11 $ + */ +public class RedeliveryPolicy implements Cloneable, Serializable { + + public static final int NO_MAXIMUM_REDELIVERIES = -1; + private static Random randomNumberGenerator; + + // +/-15% for a 30% spread -cgs + private double collisionAvoidanceFactor = 0.15d; + private int maximumRedeliveries = 6; + private long initialRedeliveryDelay = 1000L; + private boolean useCollisionAvoidance; + private boolean useExponentialBackOff; + private short backOffMultiplier = 5; + + public RedeliveryPolicy() { + } + + public RedeliveryPolicy copy() { + try { + return (RedeliveryPolicy)clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Could not clone: " + e, e); + } + } + + public short getBackOffMultiplier() { + return backOffMultiplier; + } + + public void setBackOffMultiplier(short backOffMultiplier) { + this.backOffMultiplier = backOffMultiplier; + } + + public short getCollisionAvoidancePercent() { + return (short)Math.round(collisionAvoidanceFactor * 100); + } + + public void setCollisionAvoidancePercent(short collisionAvoidancePercent) { + this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d; + } + + public long getInitialRedeliveryDelay() { + return initialRedeliveryDelay; + } + + public void setInitialRedeliveryDelay(long initialRedeliveryDelay) { + this.initialRedeliveryDelay = initialRedeliveryDelay; + } + + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + public long getRedeliveryDelay(long previousDelay) { + long redeliveryDelay; + + if (previousDelay == 0) { + redeliveryDelay = initialRedeliveryDelay; + } else if (useExponentialBackOff && backOffMultiplier > 1) { + redeliveryDelay = previousDelay * backOffMultiplier; + } else { + redeliveryDelay = previousDelay; + } + + if (useCollisionAvoidance) { + /* + * First random determines +/-, second random determines how far to + * go in that direction. -cgs + */ + Random random = getRandomNumberGenerator(); + double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble(); + redeliveryDelay += redeliveryDelay * variance; + } + + return redeliveryDelay; + } + + public boolean isUseCollisionAvoidance() { + return useCollisionAvoidance; + } + + public void setUseCollisionAvoidance(boolean useCollisionAvoidance) { + this.useCollisionAvoidance = useCollisionAvoidance; + } + + public boolean isUseExponentialBackOff() { + return useExponentialBackOff; + } + + public void setUseExponentialBackOff(boolean useExponentialBackOff) { + this.useExponentialBackOff = useExponentialBackOff; + } + + protected static synchronized Random getRandomNumberGenerator() { + if (randomNumberGenerator == null) { + randomNumberGenerator = new Random(); + } + return randomNumberGenerator; + } + +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/Service.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + + +/** + * The core lifecyle interface for ActiveMQ components. + * + * If there was a standard way to do so, it'd be good to register this + * interface with Spring so it treats the start/stop methods as those of + * {@link org.springframework.beans.factory.InitializingBean} + * and {@link org.springframework.beans.factory.DisposableBean} + * + * @version $Revision: 1.1 $ + */ +public interface Service { + + void start() throws Exception; + + void stop() throws Exception; + +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/StreamConnection.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Topic; + +/** + * The StreamConnection interface allows you to send and receive data from a + * Destination in using standard java InputStream and OutputStream objects. It's + * best use case is to send and receive large amounts of data that would be to + * large to hold in a single JMS message. + * + * @version $Revision$ + */ +public interface StreamConnection extends Connection { + + InputStream createInputStream(Destination dest) throws JMSException; + + InputStream createInputStream(Destination dest, String messageSelector) throws JMSException; + + InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException; + + InputStream createDurableInputStream(Topic dest, String name) throws JMSException; + + InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException; + + InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException; + + OutputStream createOutputStream(Destination dest) throws JMSException; + + OutputStream createOutputStream(Destination dest, Map streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException; + + /** + * Unsubscribes a durable subscription that has been created by a client. + *

+ * This method deletes the state being maintained on behalf of the + * subscriber by its provider. + *

+ * It is erroneous for a client to delete a durable subscription while there + * is an active MessageConsumer or + * TopicSubscriber for the subscription, or while a consumed + * message is part of a pending transaction or has not been acknowledged in + * the session. + * + * @param name the name used to identify this subscription + * @throws JMSException if the session fails to unsubscribe to the durable + * subscription due to some internal error. + * @throws InvalidDestinationException if an invalid subscription name is + * specified. + * @since 1.1 + */ + void unsubscribe(String name) throws JMSException; +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + + +/** +* A holder for different thread priorites used in ActiveMQ +* +* @version $Revision: 1.9 $ +*/ + +public interface ThreadPriorities { + int INBOUND_BROKER_CONNECTION = 6; + int OUT_BOUND_BROKER_DISPATCH = 6; + int INBOUND_CLIENT_CONNECTION = 7; + int INBOUND_CLIENT_SESSION = 7; + int BROKER_MANAGEMENT = 9; +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ThreadPriorities.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,659 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.JMSException; +import javax.jms.TransactionInProgressException; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.DataArrayResponse; +import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.IntegerResponse; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.LongSequenceGenerator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A TransactionContext provides the means to control a JMS transaction. It + * provides a local transaction interface and also an XAResource interface.

+ * An application server controls the transactional assignment of an XASession + * by obtaining its XAResource. It uses the XAResource to assign the session to + * a transaction, prepare and commit work on the transaction, and so on.

An + * XAResource provides some fairly sophisticated facilities for interleaving + * work on multiple transactions, recovering a list of transactions in progress, + * and so on. A JTA aware JMS provider must fully implement this functionality. + * This could be done by using the services of a database that supports XA, or a + * JMS provider may choose to implement this functionality from scratch.

+ * + * @version $Revision: 1.10 $ + * @see javax.jms.Session + * @see javax.jms.QueueSession + * @see javax.jms.TopicSession + * @see javax.jms.XASession + */ +public class TransactionContext implements XAResource { + + private static final Log LOG = LogFactory.getLog(TransactionContext.class); + + // XATransactionId -> ArrayList of TransactionContext objects + private final ConcurrentHashMap> ENDED_XA_TRANSACTION_CONTEXTS = new ConcurrentHashMap>(); + + private final ActiveMQConnection connection; + private final LongSequenceGenerator localTransactionIdGenerator; + private final ConnectionId connectionId; + private List synchronizations; + + // To track XA transactions. + private Xid associatedXid; + private TransactionId transactionId; + private LocalTransactionEventListener localTransactionEventListener; + + public TransactionContext(ActiveMQConnection connection) { + this.connection = connection; + this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); + this.connectionId = connection.getConnectionInfo().getConnectionId(); + } + + public boolean isInXATransaction() { + return transactionId != null && transactionId.isXATransaction(); + } + + public boolean isInLocalTransaction() { + return transactionId != null && transactionId.isLocalTransaction(); + } + + public boolean isInTransaction() { + return transactionId != null; + } + + /** + * @return Returns the localTransactionEventListener. + */ + public LocalTransactionEventListener getLocalTransactionEventListener() { + return localTransactionEventListener; + } + + /** + * Used by the resource adapter to listen to transaction events. + * + * @param localTransactionEventListener The localTransactionEventListener to + * set. + */ + public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { + this.localTransactionEventListener = localTransactionEventListener; + } + + // /////////////////////////////////////////////////////////// + // + // Methods that work with the Synchronization objects registered with + // the transaction. + // + // /////////////////////////////////////////////////////////// + + public void addSynchronization(Synchronization s) { + if (synchronizations == null) { + synchronizations = new ArrayList(10); + } + synchronizations.add(s); + } + + private void afterRollback() throws JMSException { + if (synchronizations == null) { + return; + } + + int size = synchronizations.size(); + try { + for (int i = 0; i < size; i++) { + synchronizations.get(i).afterRollback(); + } + } catch (JMSException e) { + throw e; + } catch (Throwable e) { + throw JMSExceptionSupport.create(e); + } finally { + synchronizations = null; + } + } + + private void afterCommit() throws JMSException { + if (synchronizations == null) { + return; + } + + int size = synchronizations.size(); + try { + for (int i = 0; i < size; i++) { + synchronizations.get(i).afterCommit(); + } + } catch (JMSException e) { + throw e; + } catch (Throwable e) { + throw JMSExceptionSupport.create(e); + } finally { + synchronizations = null; + } + } + + private void beforeEnd() throws JMSException { + if (synchronizations == null) { + return; + } + + int size = synchronizations.size(); + try { + for (int i = 0; i < size; i++) { + synchronizations.get(i).beforeEnd(); + } + } catch (JMSException e) { + throw e; + } catch (Throwable e) { + throw JMSExceptionSupport.create(e); + } + } + + public TransactionId getTransactionId() { + return transactionId; + } + + // /////////////////////////////////////////////////////////// + // + // Local transaction interface. + // + // /////////////////////////////////////////////////////////// + + /** + * Start a local transaction. + * @throws javax.jms.JMSException on internal error + */ + public void begin() throws JMSException { + + if (isInXATransaction()) { + throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); + } + + if (transactionId == null) { + synchronizations = null; + this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); + TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); + this.connection.ensureConnectionInfoSent(); + this.connection.asyncSendPacket(info); + + // Notify the listener that the tx was started. + if (localTransactionEventListener != null) { + localTransactionEventListener.beginEvent(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Begin:" + transactionId); + } + } + + } + + /** + * Rolls back any work done in this transaction and releases any locks + * currently held. + * + * @throws JMSException if the JMS provider fails to roll back the + * transaction due to some internal error. + * @throws javax.jms.IllegalStateException if the method is not called by a + * transacted session. + */ + public void rollback() throws JMSException { + if (isInXATransaction()) { + throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); + } + + beforeEnd(); + if (transactionId != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Rollback: " + transactionId + + " syncCount: " + + (synchronizations != null ? synchronizations.size() : 0)); + } + + TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); + this.transactionId = null; + this.connection.asyncSendPacket(info); + // Notify the listener that the tx was rolled back + if (localTransactionEventListener != null) { + localTransactionEventListener.rollbackEvent(); + } + } + + afterRollback(); + } + + /** + * Commits all work done in this transaction and releases any locks + * currently held. + * + * @throws JMSException if the JMS provider fails to commit the transaction + * due to some internal error. + * @throws javax.jms.IllegalStateException if the method is not called by a + * transacted session. + */ + public void commit() throws JMSException { + if (isInXATransaction()) { + throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); + } + + beforeEnd(); + + // Only send commit if the transaction was started. + if (transactionId != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Commit: " + transactionId + + " syncCount: " + + (synchronizations != null ? synchronizations.size() : 0)); + } + + TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); + this.transactionId = null; + // Notify the listener that the tx was committed back + this.connection.syncSendPacket(info); + if (localTransactionEventListener != null) { + localTransactionEventListener.commitEvent(); + } + afterCommit(); + } + } + + // /////////////////////////////////////////////////////////// + // + // XAResource Implementation + // + // /////////////////////////////////////////////////////////// + /** + * Associates a transaction with the resource. + */ + public void start(Xid xid, int flags) throws XAException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Start: " + xid); + } + if (isInLocalTransaction()) { + throw new XAException(XAException.XAER_PROTO); + } + // Are we already associated? + if (associatedXid != null) { + throw new XAException(XAException.XAER_PROTO); + } + + // if ((flags & TMJOIN) == TMJOIN) { + // // TODO: verify that the server has seen the xid + // } + // if ((flags & TMJOIN) == TMRESUME) { + // // TODO: verify that the xid was suspended. + // } + + // associate + synchronizations = null; + setXid(xid); + } + + /** + * @return connectionId for connection + */ + private ConnectionId getConnectionId() { + return connection.getConnectionInfo().getConnectionId(); + } + + public void end(Xid xid, int flags) throws XAException { + + if (LOG.isDebugEnabled()) { + LOG.debug("End: " + xid); + } + + if (isInLocalTransaction()) { + throw new XAException(XAException.XAER_PROTO); + } + + if ((flags & (TMSUSPEND | TMFAIL)) != 0) { + // You can only suspend the associated xid. + if (!equals(associatedXid, xid)) { + throw new XAException(XAException.XAER_PROTO); + } + + // TODO: we may want to put the xid in a suspended list. + try { + beforeEnd(); + } catch (JMSException e) { + throw toXAException(e); + } + setXid(null); + } else if ((flags & TMSUCCESS) == TMSUCCESS) { + // set to null if this is the current xid. + // otherwise this could be an asynchronous success call + if (equals(associatedXid, xid)) { + try { + beforeEnd(); + } catch (JMSException e) { + throw toXAException(e); + } + setXid(null); + } + } else { + throw new XAException(XAException.XAER_INVAL); + } + } + + private boolean equals(Xid xid1, Xid xid2) { + if (xid1 == xid2) { + return true; + } + if (xid1 == null ^ xid2 == null) { + return false; + } + return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) + && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); + } + + public int prepare(Xid xid) throws XAException { + if (LOG.isDebugEnabled()) { + LOG.debug("Prepare: " + xid); + } + + // We allow interleaving multiple transactions, so + // we don't limit prepare to the associated xid. + XATransactionId x; + // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been + // called first + if (xid == null || (equals(associatedXid, xid))) { + throw new XAException(XAException.XAER_PROTO); + } else { + // TODO: cache the known xids so we don't keep recreating this one?? + x = new XATransactionId(xid); + } + + try { + TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); + + // Find out if the server wants to commit or rollback. + IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); + return response.getResult(); + + } catch (JMSException e) { + throw toXAException(e); + } + } + + public void rollback(Xid xid) throws XAException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Rollback: " + xid); + } + + // We allow interleaving multiple transactions, so + // we don't limit rollback to the associated xid. + XATransactionId x; + if (xid == null) { + throw new XAException(XAException.XAER_PROTO); + } + if (equals(associatedXid, xid)) { + // I think this can happen even without an end(xid) call. Need to + // check spec. + x = (XATransactionId)transactionId; + } else { + x = new XATransactionId(xid); + } + + try { + this.connection.checkClosedOrFailed(); + this.connection.ensureConnectionInfoSent(); + + // Let the server know that the tx is rollback. + TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); + this.connection.syncSendPacket(info); + + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + ctx.afterRollback(); + } + } + + } catch (JMSException e) { + throw toXAException(e); + } + } + + // XAResource interface + public void commit(Xid xid, boolean onePhase) throws XAException { + + if (LOG.isDebugEnabled()) { + LOG.debug("Commit: " + xid); + } + + // We allow interleaving multiple transactions, so + // we don't limit commit to the associated xid. + XATransactionId x; + if (xid == null || (equals(associatedXid, xid))) { + // should never happen, end(xid,TMSUCCESS) must have been previously + // called + throw new XAException(XAException.XAER_PROTO); + } else { + x = new XATransactionId(xid); + } + + try { + this.connection.checkClosedOrFailed(); + this.connection.ensureConnectionInfoSent(); + + // Notify the server that the tx was committed back + TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); + + this.connection.syncSendPacket(info); + + List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); + if (l != null && !l.isEmpty()) { + for (TransactionContext ctx : l) { + ctx.afterCommit(); + } + } + + } catch (JMSException e) { + throw toXAException(e); + } + + } + + public void forget(Xid xid) throws XAException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forget: " + xid); + } + + // We allow interleaving multiple transactions, so + // we don't limit forget to the associated xid. + XATransactionId x; + if (xid == null) { + throw new XAException(XAException.XAER_PROTO); + } + if (equals(associatedXid, xid)) { + // TODO determine if this can happen... I think not. + x = (XATransactionId)transactionId; + } else { + x = new XATransactionId(xid); + } + + TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); + + try { + // Tell the server to forget the transaction. + this.connection.syncSendPacket(info); + } catch (JMSException e) { + throw toXAException(e); + } + } + + public boolean isSameRM(XAResource xaResource) throws XAException { + if (xaResource == null) { + return false; + } + if (!(xaResource instanceof TransactionContext)) { + return false; + } + TransactionContext xar = (TransactionContext)xaResource; + try { + return getResourceManagerId().equals(xar.getResourceManagerId()); + } catch (Throwable e) { + throw (XAException)new XAException("Could not get resource manager id.").initCause(e); + } + } + + public Xid[] recover(int flag) throws XAException { + if (LOG.isDebugEnabled()) { + LOG.debug("Recover: " + flag); + } + + TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); + try { + this.connection.checkClosedOrFailed(); + this.connection.ensureConnectionInfoSent(); + + DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); + DataStructure[] data = receipt.getData(); + XATransactionId[] answer; + if (data instanceof XATransactionId[]) { + answer = (XATransactionId[])data; + } else { + answer = new XATransactionId[data.length]; + System.arraycopy(data, 0, answer, 0, data.length); + } + return answer; + } catch (JMSException e) { + throw toXAException(e); + } + } + + public int getTransactionTimeout() throws XAException { + return 0; + } + + public boolean setTransactionTimeout(int seconds) throws XAException { + return false; + } + + // /////////////////////////////////////////////////////////// + // + // Helper methods. + // + // /////////////////////////////////////////////////////////// + private String getResourceManagerId() throws JMSException { + return this.connection.getResourceManagerId(); + } + + private void setXid(Xid xid) throws XAException { + + try { + this.connection.checkClosedOrFailed(); + this.connection.ensureConnectionInfoSent(); + } catch (JMSException e) { + throw toXAException(e); + } + + if (xid != null) { + // associate + associatedXid = xid; + transactionId = new XATransactionId(xid); + + TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN); + try { + this.connection.asyncSendPacket(info); + if (LOG.isDebugEnabled()) { + LOG.debug("Started XA transaction: " + transactionId); + } + } catch (JMSException e) { + throw toXAException(e); + } + + } else { + + if (transactionId != null) { + TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END); + try { + this.connection.syncSendPacket(info); + if (LOG.isDebugEnabled()) { + LOG.debug("Ended XA transaction: " + transactionId); + } + } catch (JMSException e) { + throw toXAException(e); + } + + // Add our self to the list of contexts that are interested in + // post commit/rollback events. + List l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); + if (l == null) { + l = new ArrayList(3); + ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); + l.add(this); + } else if (!l.contains(this)) { + l.add(this); + } + } + + // dis-associate + associatedXid = null; + transactionId = null; + } + } + + /** + * Converts a JMSException from the server to an XAException. if the + * JMSException contained a linked XAException that is returned instead. + * + * @param e JMSException to convert + * @return XAException wrapping original exception or its message + */ + private XAException toXAException(JMSException e) { + if (e.getCause() != null && e.getCause() instanceof XAException) { + XAException original = (XAException)e.getCause(); + XAException xae = new XAException(original.getMessage()); + xae.errorCode = original.errorCode; + xae.initCause(original); + return xae; + } + + XAException xae = new XAException(e.getMessage()); + xae.errorCode = XAException.XAER_RMFAIL; + xae.initCause(e); + return xae; + } + + public ActiveMQConnection getConnection() { + return connection; + } + + public void cleanup() { + associatedXid = null; + transactionId = null; + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEvent.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.EventObject; + +import javax.jms.Destination; + +import org.apache.activemq.command.ConsumerId; + +/** + * An event when the number of consumers on a given destination changes. + * + * @version $Revision: 564057 $ + */ +public abstract class ConsumerEvent extends EventObject { + private static final long serialVersionUID = 2442156576867593780L; + private final Destination destination; + private final ConsumerId consumerId; + private final int consumerCount; + + public ConsumerEvent(ConsumerEventSource source, Destination destination, ConsumerId consumerId, int consumerCount) { + super(source); + this.destination = destination; + this.consumerId = consumerId; + this.consumerCount = consumerCount; + } + + public ConsumerEventSource getAdvisor() { + return (ConsumerEventSource) getSource(); + } + + public Destination getDestination() { + return destination; + } + + /** + * Returns the current number of consumers active at the time this advisory was sent. + * + * Note that this is not the number of consumers active when the consumer started consuming. + * It is usually more vital to know how many consumers there are now - rather than historically + * how many there were when a consumer started. So if you create a {@link ConsumerListener} + * after many consumers have started, you will receive a ConsumerEvent for each consumer. However the + * {@link #getConsumerCount()} method will always return the current active consumer count on each event. + */ + public int getConsumerCount() { + return consumerCount; + } + + public ConsumerId getConsumerId() { + return consumerId; + } + + public abstract boolean isStarted(); +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerEventSource.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.Service; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An object which can be used to listen to the number of active consumers + * available on a given destination. + * + * @version $Revision: 669263 $ + */ +public class ConsumerEventSource implements Service, MessageListener { + private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class); + + private final Connection connection; + private final ActiveMQDestination destination; + private ConsumerListener listener; + private AtomicBoolean started = new AtomicBoolean(false); + private AtomicInteger consumerCount = new AtomicInteger(); + private Session session; + private ActiveMQMessageConsumer consumer; + + public ConsumerEventSource(Connection connection, Destination destination) throws JMSException { + this.connection = connection; + this.destination = ActiveMQDestination.transform(destination); + } + + public void setConsumerListener(ConsumerListener listener) { + this.listener = listener; + } + + public String getConsumerId() { + return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET"; + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination); + consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic); + consumer.setMessageListener(this); + } + } + + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + if (session != null) { + session.close(); + } + } + } + + public void onMessage(Message message) { + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage)message; + Object command = activeMessage.getDataStructure(); + int count = 0; + if (command instanceof ConsumerInfo) { + count = consumerCount.incrementAndGet(); + count = extractConsumerCountFromMessage(message, count); + fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count)); + } else if (command instanceof RemoveInfo) { + RemoveInfo removeInfo = (RemoveInfo)command; + if (removeInfo.isConsumerRemove()) { + count = consumerCount.decrementAndGet(); + count = extractConsumerCountFromMessage(message, count); + fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count)); + } + } else { + LOG.warn("Unknown command: " + command); + } + } else { + LOG.warn("Unknown message type: " + message + ". Message ignored"); + } + } + + /** + * Lets rely by default on the broker telling us what the consumer count is + * as it can ensure that we are up to date at all times and have not + * received messages out of order etc. + */ + protected int extractConsumerCountFromMessage(Message message, int count) { + try { + Object value = message.getObjectProperty("consumerCount"); + if (value instanceof Number) { + Number n = (Number)value; + return n.intValue(); + } + LOG.warn("No consumerCount header available on the message: " + message); + } catch (Exception e) { + LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e); + } + return count; + } + + protected void fireConsumerEvent(ConsumerEvent event) { + if (listener != null) { + listener.onConsumerEvent(event); + } + } + +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerListener.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +/** + * Listen to the changes in the number of active consumers available for a given destination. + * + * @version $Revision: 564271 $ + */ +public interface ConsumerListener { + + void onConsumerEvent(ConsumerEvent event); +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStartedEvent.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; + +/** + * An event when a new consumer has started. + * + * @version $Revision: 564271 $ + */ +public class ConsumerStartedEvent extends ConsumerEvent { + + private static final long serialVersionUID = 5088138839609391074L; + + private final transient ConsumerInfo consumerInfo; + + public ConsumerStartedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerInfo consumerInfo, int count) { + super(source, destination, consumerInfo.getConsumerId(), count); + this.consumerInfo = consumerInfo; + } + + public boolean isStarted() { + return true; + } + + /** + * @return details of the subscription + */ + public ConsumerInfo getConsumerInfo() { + return consumerInfo; + } +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ConsumerStoppedEvent.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; + +/** + * An event generated when a consumer stops. + * + * @version $Revision: 563921 $ + */ +public class ConsumerStoppedEvent extends ConsumerEvent { + + private static final long serialVersionUID = 5378835541037193206L; + + public ConsumerStoppedEvent(ConsumerEventSource source, ActiveMQDestination destination, ConsumerId consumerId, int count) { + super(source, destination, consumerId, count); + } + + public boolean isStarted() { + return false; + } + +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationEvent.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.EventObject; + +import javax.jms.Destination; + +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * An event caused when a destination is created or deleted + * + * @version $Revision: 634277 $ + */ +public class DestinationEvent extends EventObject { + private static final long serialVersionUID = 2442156576867593780L; + private DestinationInfo destinationInfo; + + public DestinationEvent(DestinationSource source, DestinationInfo destinationInfo) { + super(source); + this.destinationInfo = destinationInfo; + } + + public ActiveMQDestination getDestination() { + return getDestinationInfo().getDestination(); + } + + public boolean isAddOperation() { + return getDestinationInfo().isAddOperation(); + } + + public long getTimeout() { + return getDestinationInfo().getTimeout(); + } + + public boolean isRemoveOperation() { + return getDestinationInfo().isRemoveOperation(); + } + + public DestinationInfo getDestinationInfo() { + return destinationInfo; + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationListener.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +/** + * Listen to the changes in destinations being created or destroyed + * + * @version $Revision: 634277 $ + */ +public interface DestinationListener { + void onDestinationEvent(DestinationEvent event); +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/DestinationSource.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.Service; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.DestinationInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them + * being created or deleted. + * + * @version $Revision: 681153 $ + */ +public class DestinationSource implements MessageListener { + private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class); + private AtomicBoolean started = new AtomicBoolean(false); + private final Connection connection; + private Session session; + private MessageConsumer queueConsumer; + private MessageConsumer topicConsumer; + private MessageConsumer tempTopicConsumer; + private MessageConsumer tempQueueConsumer; + private Set queues = new CopyOnWriteArraySet(); + private Set topics = new CopyOnWriteArraySet(); + private Set temporaryQueues = new CopyOnWriteArraySet(); + private Set temporaryTopics = new CopyOnWriteArraySet(); + private DestinationListener listener; + + public DestinationSource(Connection connection) throws JMSException { + this.connection = connection; + } + + public DestinationListener getListener() { + return listener; + } + + public void setDestinationListener(DestinationListener listener) { + this.listener = listener; + } + + /** + * Returns the current queues available on the broker + */ + public Set getQueues() { + return queues; + } + + /** + * Returns the current topics on the broker + */ + public Set getTopics() { + return topics; + } + + /** + * Returns the current temporary topics available on the broker + */ + public Set getTemporaryQueues() { + return temporaryQueues; + } + + /** + * Returns the current temporary queues available on the broker + */ + public Set getTemporaryTopics() { + return temporaryTopics; + } + + public void start() throws JMSException { + if (started.compareAndSet(false, true)) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC); + queueConsumer.setMessageListener(this); + + topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC); + topicConsumer.setMessageListener(this); + + tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC); + tempQueueConsumer.setMessageListener(this); + + tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC); + tempTopicConsumer.setMessageListener(this); + } + } + + public void stop() throws JMSException { + if (started.compareAndSet(true, false)) { + if (session != null) { + session.close(); + } + } + } + + public void onMessage(Message message) { + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + Object command = activeMessage.getDataStructure(); + if (command instanceof DestinationInfo) { + DestinationInfo destinationInfo = (DestinationInfo) command; + DestinationEvent event = new DestinationEvent(this, destinationInfo); + fireDestinationEvent(event); + } + else { + LOG.warn("Unknown dataStructure: " + command); + } + } + else { + LOG.warn("Unknown message type: " + message + ". Message ignored"); + } + } + + protected void fireDestinationEvent(DestinationEvent event) { + // now lets update the data structures + ActiveMQDestination destination = event.getDestination(); + boolean add = event.isAddOperation(); + if (destination instanceof ActiveMQQueue) { + ActiveMQQueue queue = (ActiveMQQueue) destination; + if (add) { + queues.add(queue); + } + else { + queues.remove(queue); + } + } + else if (destination instanceof ActiveMQTopic) { + ActiveMQTopic topic = (ActiveMQTopic) destination; + if (add) { + topics.add(topic); + } + else { + topics.remove(topic); + } + } + else if (destination instanceof ActiveMQTempQueue) { + ActiveMQTempQueue queue = (ActiveMQTempQueue) destination; + if (add) { + temporaryQueues.add(queue); + } + else { + temporaryQueues.remove(queue); + } + } + else if (destination instanceof ActiveMQTempTopic) { + ActiveMQTempTopic topic = (ActiveMQTempTopic) destination; + if (add) { + temporaryTopics.add(topic); + } + else { + temporaryTopics.remove(topic); + } + } + else { + LOG.warn("Unknown destination type: " + destination); + } + if (listener != null) { + listener.onDestinationEvent(event); + } + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEvent.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.EventObject; + +import javax.jms.Destination; + +import org.apache.activemq.command.ProducerId; + +/** + * An event when the number of producers on a given destination changes. + * + * @version $Revision: 359679 $ + */ +public abstract class ProducerEvent extends EventObject { + private static final long serialVersionUID = 2442156576867593780L; + private final Destination destination; + private final ProducerId producerId; + private final int producerCount; + + public ProducerEvent(ProducerEventSource source, Destination destination, ProducerId producerId, int producerCount) { + super(source); + this.destination = destination; + this.producerId = producerId; + this.producerCount = producerCount; + } + + public ProducerEventSource getAdvisor() { + return (ProducerEventSource) getSource(); + } + + public Destination getDestination() { + return destination; + } + + /** + * Returns the current number of producers active at the time this advisory was sent. + * + */ + public int getProducerCount() { + return producerCount; + } + + public ProducerId getProducerId() { + return producerId; + } + + public abstract boolean isStarted(); +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/advisory/ProducerEventSource.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.Service; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An object which can be used to listen to the number of active consumers + * available on a given destination. + * + * @version $Revision: 359679 $ + */ +public class ProducerEventSource implements Service, MessageListener { + private static final Log LOG = LogFactory.getLog(ProducerEventSource.class); + + private final Connection connection; + private final ActiveMQDestination destination; + private ProducerListener listener; + private AtomicBoolean started = new AtomicBoolean(false); + private AtomicInteger producerCount = new AtomicInteger(); + private Session session; + private MessageConsumer consumer; + + public ProducerEventSource(Connection connection, Destination destination) throws JMSException { + this.connection = connection; + this.destination = ActiveMQDestination.transform(destination); + } + + public void setProducerListener(ProducerListener listener) { + this.listener = listener; + } + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination); + consumer = session.createConsumer(advisoryTopic); + consumer.setMessageListener(this); + } + } + + public void stop() throws Exception { + if (started.compareAndSet(true, false)) { + if (session != null) { + session.close(); + } + } + } + + public void onMessage(Message message) { + if (message instanceof ActiveMQMessage) { + ActiveMQMessage activeMessage = (ActiveMQMessage)message; + Object command = activeMessage.getDataStructure(); + int count = 0; + if (command instanceof ProducerInfo) { + count = producerCount.incrementAndGet(); + count = extractProducerCountFromMessage(message, count); + fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count)); + } else if (command instanceof RemoveInfo) { + RemoveInfo removeInfo = (RemoveInfo)command; + if (removeInfo.isProducerRemove()) { + count = producerCount.decrementAndGet(); + count = extractProducerCountFromMessage(message, count); + fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count)); + } + } else { + LOG.warn("Unknown command: " + command); + } + } else { + LOG.warn("Unknown message type: " + message + ". Message ignored"); + } + } + + protected int extractProducerCountFromMessage(Message message, int count) { + try { + Object value = message.getObjectProperty("producerCount"); + if (value instanceof Number) { + Number n = (Number)value; + return n.intValue(); + } + LOG.warn("No producerCount header available on the message: " + message); + } catch (Exception e) { + LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); + } + return count; + } + + protected void fireProducerEvent(ProducerEvent event) { + if (listener != null) { + listener.onProducerEvent(event); + } + } + +}