From commits-return-10960-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Jun 02 21:30:09 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 12293 invoked from network); 2 Jun 2009 21:30:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Jun 2009 21:30:09 -0000 Received: (qmail 96281 invoked by uid 500); 2 Jun 2009 21:30:21 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96261 invoked by uid 500); 2 Jun 2009 21:30:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96252 invoked by uid 99); 2 Jun 2009 21:30:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BD52923888E9; Tue, 2 Jun 2009 21:29:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r781177 [5/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo... Date: Tue, 02 Jun 2009 21:29:35 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090602212940.BD52923888E9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,1184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempDestination; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.filter.FilterException; +import org.apache.activemq.management.JMSConsumerStatsImpl; +import org.apache.activemq.management.StatsCapable; +import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.thread.Scheduler; +import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.util.Callback; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A client uses a MessageConsumer object to receive messages + * from a destination. A MessageConsumer object is created by + * passing a Destination object to a message-consumer creation + * method supplied by a session. + *

+ * MessageConsumer is the parent interface for all message + * consumers. + *

+ * A message consumer can be created with a message selector. A message selector + * allows the client to restrict the messages delivered to the message consumer + * to those that match the selector. + *

+ * A client may either synchronously receive a message consumer's messages or + * have the consumer asynchronously deliver them as they arrive. + *

+ * For synchronous receipt, a client can request the next message from a message + * consumer using one of its receive methods. There are several + * variations of receive that allow a client to poll or wait for + * the next message. + *

+ * For asynchronous delivery, a client can register a + * MessageListener object with a message consumer. As messages + * arrive at the message consumer, it delivers them by calling the + * MessageListener's + * onMessage method. + *

+ * It is a client programming error for a MessageListener to + * throw an exception. + * + * @version $Revision: 1.22 $ + * @see javax.jms.MessageConsumer + * @see javax.jms.QueueReceiver + * @see javax.jms.TopicSubscriber + * @see javax.jms.Session + */ +public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher { + + private static final Log LOG = LogFactory.getLog(ActiveMQMessageConsumer.class); + protected static final Scheduler scheduler = Scheduler.getInstance(); + protected final ActiveMQSession session; + protected final ConsumerInfo info; + + // These are the messages waiting to be delivered to the client + private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); + + // The are the messages that were delivered to the consumer but that have + // not been acknowledged. It's kept in reverse order since we + // Always walk list in reverse order. + private final LinkedList deliveredMessages = new LinkedList(); + private int deliveredCounter; + private int additionalWindowSize; + private long redeliveryDelay; + private int ackCounter; + private int dispatchedCount; + private final AtomicReference messageListener = new AtomicReference(); + private JMSConsumerStatsImpl stats; + + private final String selector; + private boolean synchronizationRegistered; + private AtomicBoolean started = new AtomicBoolean(false); + + private MessageAvailableListener availableListener; + + private RedeliveryPolicy redeliveryPolicy; + private boolean optimizeAcknowledge; + private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); + private ExecutorService executorService; + private MessageTransformer transformer; + private boolean clearDispatchList; + + private MessageAck pendingAck; + private long lastDeliveredSequenceId; + + private IOException failureError; + + /** + * Create a MessageConsumer + * + * @param session + * @param dest + * @param name + * @param selector + * @param prefetch + * @param maximumPendingMessageCount TODO + * @param noLocal + * @param browser + * @param dispatchAsync + * @param messageListener + * @throws JMSException + */ + public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, + String name, String selector, int prefetch, + int maximumPendingMessageCount, boolean noLocal, boolean browser, + boolean dispatchAsync, MessageListener messageListener) throws JMSException { + if (dest == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } else if (dest.getPhysicalName() == null) { + throw new InvalidDestinationException("The destination object was not given a physical name."); + } else if (dest.isTemporary()) { + String physicalName = dest.getPhysicalName(); + + if (physicalName == null) { + throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest); + } + + String connectionID = session.connection.getConnectionInfo().getConnectionId().getValue(); + + if (physicalName.indexOf(connectionID) < 0) { + throw new InvalidDestinationException( + "Cannot use a Temporary destination from another Connection"); + } + + if (session.connection.isDeleted(dest)) { + throw new InvalidDestinationException( + "Cannot use a Temporary destination that has been deleted"); + } + if (prefetch < 0) { + throw new JMSException("Cannot have a prefetch size less than zero"); + } + } + + this.session = session; + this.redeliveryPolicy = session.connection.getRedeliveryPolicy(); + setTransformer(session.getTransformer()); + + this.info = new ConsumerInfo(consumerId); + this.info.setExclusive(this.session.connection.isExclusiveConsumer()); + this.info.setSubscriptionName(name); + this.info.setPrefetchSize(prefetch); + this.info.setCurrentPrefetchSize(prefetch); + this.info.setMaximumPendingMessageLimit(maximumPendingMessageCount); + this.info.setNoLocal(noLocal); + this.info.setDispatchAsync(dispatchAsync); + this.info.setRetroactive(this.session.connection.isUseRetroactiveConsumer()); + this.info.setSelector(null); + + // Allows the options on the destination to configure the consumerInfo + if (dest.getOptions() != null) { + Map options = new HashMap(dest.getOptions()); + IntrospectionSupport.setProperties(this.info, options, "consumer."); + } + + this.info.setDestination(dest); + this.info.setBrowser(browser); + try { + if (selector != null && selector.trim().length() != 0) { + // Validate the selector + SelectorParser.parse(selector); + this.info.setSelector(selector); + this.selector = selector; + } else if (info.getSelector() != null) { + // Validate the selector + SelectorParser.parse(this.info.getSelector()); + this.selector = this.info.getSelector(); + } else { + this.selector = null; + } + } catch (FilterException e) { + throw JMSExceptionSupport.createInvalidSelectorException(e); + } + + this.stats = new JMSConsumerStatsImpl(session.getSessionStats(), dest); + this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() + && !info.isBrowser(); + this.info.setOptimizedAcknowledge(this.optimizeAcknowledge); + + if (messageListener != null) { + setMessageListener(messageListener); + } + try { + this.session.addConsumer(this); + this.session.syncSendPacket(info); + } catch (JMSException e) { + this.session.removeConsumer(this); + throw e; + } + + if (session.connection.isStarted()) { + start(); + } + } + + public StatsImpl getStats() { + return stats; + } + + public JMSConsumerStatsImpl getConsumerStats() { + return stats; + } + + public RedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + /** + * Sets the redelivery policy used when messages are redelivered + */ + public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy; + } + + public MessageTransformer getTransformer() { + return transformer; + } + + /** + * Sets the transformer used to transform messages before they are sent on + * to the JMS bus + */ + public void setTransformer(MessageTransformer transformer) { + this.transformer = transformer; + } + + /** + * @return Returns the value. + */ + public ConsumerId getConsumerId() { + return info.getConsumerId(); + } + + /** + * @return the consumer name - used for durable consumers + */ + public String getConsumerName() { + return this.info.getSubscriptionName(); + } + + /** + * @return true if this consumer does not accept locally produced messages + */ + protected boolean isNoLocal() { + return info.isNoLocal(); + } + + /** + * Retrieve is a browser + * + * @return true if a browser + */ + protected boolean isBrowser() { + return info.isBrowser(); + } + + /** + * @return ActiveMQDestination + */ + protected ActiveMQDestination getDestination() { + return info.getDestination(); + } + + /** + * @return Returns the prefetchNumber. + */ + public int getPrefetchNumber() { + return info.getPrefetchSize(); + } + + /** + * @return true if this is a durable topic subscriber + */ + public boolean isDurableSubscriber() { + return info.getSubscriptionName() != null && info.getDestination().isTopic(); + } + + /** + * Gets this message consumer's message selector expression. + * + * @return this message consumer's message selector, or null if no message + * selector exists for the message consumer (that is, if the message + * selector was not set or was set to null or the empty string) + * @throws JMSException if the JMS provider fails to receive the next + * message due to some internal error. + */ + public String getMessageSelector() throws JMSException { + checkClosed(); + return selector; + } + + /** + * Gets the message consumer's MessageListener. + * + * @return the listener for the message consumer, or null if no listener is + * set + * @throws JMSException if the JMS provider fails to get the message + * listener due to some internal error. + * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) + */ + public MessageListener getMessageListener() throws JMSException { + checkClosed(); + return this.messageListener.get(); + } + + /** + * Sets the message consumer's MessageListener. + *

+ * Setting the message listener to null is the equivalent of unsetting the + * message listener for the message consumer. + *

+ * The effect of calling MessageConsumer.setMessageListener + * while messages are being consumed by an existing listener or the consumer + * is being used to consume messages synchronously is undefined. + * + * @param listener the listener to which the messages are to be delivered + * @throws JMSException if the JMS provider fails to receive the next + * message due to some internal error. + * @see javax.jms.MessageConsumer#getMessageListener + */ + public void setMessageListener(MessageListener listener) throws JMSException { + checkClosed(); + if (info.getPrefetchSize() == 0) { + throw new JMSException( + "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); + } + if (listener != null) { + boolean wasRunning = session.isRunning(); + if (wasRunning) { + session.stop(); + } + + this.messageListener.set(listener); + session.redispatch(this, unconsumedMessages); + + if (wasRunning) { + session.start(); + } + } else { + this.messageListener.set(null); + } + } + + public MessageAvailableListener getAvailableListener() { + return availableListener; + } + + /** + * Sets the listener used to notify synchronous consumers that there is a + * message available so that the {@link MessageConsumer#receiveNoWait()} can + * be called. + */ + public void setAvailableListener(MessageAvailableListener availableListener) { + this.availableListener = availableListener; + } + + /** + * Used to get an enqueued message from the unconsumedMessages list. The + * amount of time this method blocks is based on the timeout value. - if + * timeout==-1 then it blocks until a message is received. - if timeout==0 + * then it it tries to not block at all, it returns a message if it is + * available - if timeout>0 then it blocks up to timeout amount of time. + * Expired messages will consumed by this method. + * + * @throws JMSException + * @return null if we timeout or if the consumer is closed. + */ + private MessageDispatch dequeue(long timeout) throws JMSException { + try { + long deadline = 0; + if (timeout > 0) { + deadline = System.currentTimeMillis() + timeout; + } + while (true) { + MessageDispatch md = unconsumedMessages.dequeue(timeout); + if (md == null) { + if (timeout > 0 && !unconsumedMessages.isClosed()) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } else { + if (failureError != null) { + throw JMSExceptionSupport.create(failureError); + } else { + return null; + } + } + } else if (md.getMessage() == null) { + return null; + } else if (md.getMessage().isExpired()) { + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " received expired message: " + md); + } + beforeMessageIsConsumed(md); + afterMessageIsConsumed(md, true); + if (timeout > 0) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(getConsumerId() + " received message: " + md); + } + return md; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw JMSExceptionSupport.create(e); + } + } + + /** + * Receives the next message produced for this message consumer. + *

+ * This call blocks indefinitely until a message is produced or until this + * message consumer is closed. + *

+ * If this receive is done within a transaction, the consumer + * retains the message until the transaction commits. + * + * @return the next message produced for this message consumer, or null if + * this message consumer is concurrently closed + */ + public Message receive() throws JMSException { + checkClosed(); + checkMessageListener(); + + sendPullCommand(0); + MessageDispatch md = dequeue(-1); + if (md == null) { + return null; + } + + beforeMessageIsConsumed(md); + afterMessageIsConsumed(md, false); + + return createActiveMQMessage(md); + } + + /** + * @param md + * @return + */ + private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException { + ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy(); + if (transformer != null) { + Message transformedMessage = transformer.consumerTransform(session, this, m); + if (transformedMessage != null) { + m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection); + } + } + if (session.isClientAcknowledge()) { + m.setAcknowledgeCallback(new Callback() { + public void execute() throws Exception { + session.checkClosed(); + session.acknowledge(); + } + }); + }else if (session.isIndividualAcknowledge()) { + m.setAcknowledgeCallback(new Callback() { + public void execute() throws Exception { + session.checkClosed(); + acknowledge(md); + } + }); + } + return m; + } + + /** + * Receives the next message that arrives within the specified timeout + * interval. + *

+ * This call blocks until a message arrives, the timeout expires, or this + * message consumer is closed. A timeout of zero never + * expires, and the call blocks indefinitely. + * + * @param timeout the timeout value (in milliseconds), a time out of zero + * never expires. + * @return the next message produced for this message consumer, or null if + * the timeout expires or this message consumer is concurrently + * closed + */ + public Message receive(long timeout) throws JMSException { + checkClosed(); + checkMessageListener(); + if (timeout == 0) { + return this.receive(); + + } + + sendPullCommand(timeout); + while (timeout > 0) { + + MessageDispatch md; + if (info.getPrefetchSize() == 0) { + md = dequeue(-1); // We let the broker let us know when we + // timeout. + } else { + md = dequeue(timeout); + } + + if (md == null) { + return null; + } + + beforeMessageIsConsumed(md); + afterMessageIsConsumed(md, false); + return createActiveMQMessage(md); + } + return null; + } + + /** + * Receives the next message if one is immediately available. + * + * @return the next message produced for this message consumer, or null if + * one is not available + * @throws JMSException if the JMS provider fails to receive the next + * message due to some internal error. + */ + public Message receiveNoWait() throws JMSException { + checkClosed(); + checkMessageListener(); + sendPullCommand(-1); + + MessageDispatch md; + if (info.getPrefetchSize() == 0) { + md = dequeue(-1); // We let the broker let us know when we + // timeout. + } else { + md = dequeue(0); + } + + if (md == null) { + return null; + } + + beforeMessageIsConsumed(md); + afterMessageIsConsumed(md, false); + return createActiveMQMessage(md); + } + + /** + * Closes the message consumer. + *

+ * Since a provider may allocate some resources on behalf of a + * MessageConsumer + * outside the Java virtual machine, clients should close them when they are + * not needed. Relying on garbage collection to eventually reclaim these + * resources may not be timely enough. + *

+ * This call blocks until a receive or message listener in + * progress has completed. A blocked message consumer receive + * call returns null when this message consumer is closed. + * + * @throws JMSException if the JMS provider fails to close the consumer due + * to some internal error. + */ + public void close() throws JMSException { + if (!unconsumedMessages.isClosed()) { + if (session.getTransactionContext().isInTransaction()) { + session.getTransactionContext().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + doClose(); + } + + public void afterRollback() throws Exception { + doClose(); + } + }); + } else { + doClose(); + } + } + } + + void doClose() throws JMSException { + dispose(); + RemoveInfo removeCommand = info.createRemoveCommand(); + removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); + this.session.asyncSendPacket(removeCommand); + } + + void clearMessagesInProgress() { + // we are called from inside the transport reconnection logic + // which involves us clearing all the connections' consumers + // dispatch lists and clearing them + // so rather than trying to grab a mutex (which could be already + // owned by the message listener calling the send) we will just set + // a flag so that the list can be cleared as soon as the + // dispatch thread is ready to flush the dispatch list + clearDispatchList = true; + } + + void deliverAcks() { + MessageAck ack = null; + if (deliveryingAcknowledgements.compareAndSet(false, true)) { + if (session.isAutoAcknowledge()) { + synchronized(deliveredMessages) { + ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); + if (ack != null) { + deliveredMessages.clear(); + ackCounter = 0; + } + } + } else if (pendingAck != null && pendingAck.isStandardAck()) { + ack = pendingAck; + pendingAck = null; + } + if (ack != null) { + final MessageAck ackToSend = ack; + + if (executorService == null) { + executorService = Executors.newSingleThreadExecutor(); + } + executorService.submit(new Runnable() { + public void run() { + try { + session.sendAck(ackToSend,true); + } catch (JMSException e) { + LOG.error(getConsumerId() + " failed to delivered acknowledgements", e); + } finally { + deliveryingAcknowledgements.set(false); + } + } + }); + } else { + deliveryingAcknowledgements.set(false); + } + } + } + + public void dispose() throws JMSException { + if (!unconsumedMessages.isClosed()) { + + // Do we have any acks we need to send out before closing? + // Ack any delivered messages now. + if (!session.getTransacted()) { + deliverAcks(); + if (session.isDupsOkAcknowledge()) { + acknowledge(); + } + } + if (executorService != null) { + executorService.shutdown(); + try { + executorService.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + if (session.isClientAcknowledge()) { + if (!this.info.isBrowser()) { + // rollback duplicates that aren't acknowledged + List tmp = null; + synchronized (this.deliveredMessages) { + tmp = new ArrayList(this.deliveredMessages); + } + for (MessageDispatch old : tmp) { + this.session.connection.rollbackDuplicate(this, old.getMessage()); + } + tmp.clear(); + } + } + if (!session.isTransacted()) { + synchronized(deliveredMessages) { + deliveredMessages.clear(); + } + } + List list = unconsumedMessages.removeAll(); + if (!this.info.isBrowser()) { + for (MessageDispatch old : list) { + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, old.getMessage()); + } + } + unconsumedMessages.close(); + this.session.removeConsumer(this); + } + } + + /** + * @throws IllegalStateException + */ + protected void checkClosed() throws IllegalStateException { + if (unconsumedMessages.isClosed()) { + throw new IllegalStateException("The Consumer is closed"); + } + } + + /** + * If we have a zero prefetch specified then send a pull command to the + * broker to pull a message we are about to receive + */ + protected void sendPullCommand(long timeout) throws JMSException { + if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) { + MessagePull messagePull = new MessagePull(); + messagePull.configure(info); + messagePull.setTimeout(timeout); + session.asyncSendPacket(messagePull); + } + } + + protected void checkMessageListener() throws JMSException { + session.checkMessageListener(); + } + + protected void setOptimizeAcknowledge(boolean value) { + if (optimizeAcknowledge && !value) { + deliverAcks(); + } + optimizeAcknowledge = value; + } + + protected void setPrefetchSize(int prefetch) { + deliverAcks(); + this.info.setCurrentPrefetchSize(prefetch); + } + + private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException { + md.setDeliverySequenceId(session.getNextDeliveryId()); + lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); + if (!session.isDupsOkAcknowledge()) { + synchronized(deliveredMessages) { + deliveredMessages.addFirst(md); + } + if (session.getTransacted()) { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } + } + } + + private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { + if (unconsumedMessages.isClosed()) { + return; + } + if (messageExpired) { + synchronized (deliveredMessages) { + deliveredMessages.remove(md); + } + stats.getExpiredMessageCount().increment(); + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } else { + stats.onMessage(); + if (session.getTransacted()) { + // Do nothing. + } else if (session.isAutoAcknowledge()) { + if (deliveryingAcknowledgements.compareAndSet(false, true)) { + synchronized (deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + if (optimizeAcknowledge) { + ackCounter++; + if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) { + MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); + if (ack != null) { + deliveredMessages.clear(); + ackCounter = 0; + session.sendAck(ack); + } + } + } else { + MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); + if (ack!=null) { + deliveredMessages.clear(); + session.sendAck(ack); + } + } + } + } + deliveryingAcknowledgements.set(false); + } + } else if (session.isDupsOkAcknowledge()) { + ackLater(md, MessageAck.STANDARD_ACK_TYPE); + } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } + else { + throw new IllegalStateException("Invalid session state."); + } + } + } + + /** + * Creates a MessageAck for all messages contained in deliveredMessages. + * Caller should hold the lock for deliveredMessages. + * + * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) + * @return null if nothing to ack. + */ + private MessageAck makeAckForAllDeliveredMessages(byte type) { + synchronized (deliveredMessages) { + if (deliveredMessages.isEmpty()) + return null; + + MessageDispatch md = deliveredMessages.getFirst(); + MessageAck ack = new MessageAck(md, type, deliveredMessages.size()); + ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId()); + return ack; + } + } + + private void ackLater(MessageDispatch md, byte ackType) throws JMSException { + + // Don't acknowledge now, but we may need to let the broker know the + // consumer got the message to expand the pre-fetch window + if (session.getTransacted()) { + session.doStartTransaction(); + if (!synchronizationRegistered) { + synchronizationRegistered = true; + session.getTransactionContext().addSynchronization(new Synchronization() { + public void beforeEnd() throws Exception { + acknowledge(); + synchronizationRegistered = false; + } + + public void afterCommit() throws Exception { + commit(); + synchronizationRegistered = false; + } + + public void afterRollback() throws Exception { + rollback(); + synchronizationRegistered = false; + } + }); + } + } + + deliveredCounter++; + + MessageAck oldPendingAck = pendingAck; + pendingAck = new MessageAck(md, ackType, deliveredCounter); + pendingAck.setTransactionId(session.getTransactionContext().getTransactionId()); + if( oldPendingAck==null ) { + pendingAck.setFirstMessageId(pendingAck.getLastMessageId()); + } else if ( oldPendingAck.getAckType() == pendingAck.getAckType() ) { + pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId()); + } else { + // old pending ack being superseded by ack of another type, if is is not a delivered + // ack and hence important, send it now so it is not lost. + if ( !oldPendingAck.isDeliveredAck()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck); + } + session.sendAck(oldPendingAck); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck); + } + } + } + + if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) { + session.sendAck(pendingAck); + pendingAck=null; + deliveredCounter = 0; + additionalWindowSize = 0; + } + } + + /** + * Acknowledge all the messages that have been delivered to the client up to + * this point. + * + * @throws JMSException + */ + public void acknowledge() throws JMSException { + synchronized(deliveredMessages) { + // Acknowledge all messages so far. + MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); + if (ack == null) + return; // no msgs + + if (session.getTransacted()) { + session.doStartTransaction(); + ack.setTransactionId(session.getTransactionContext().getTransactionId()); + } + session.sendAck(ack); + pendingAck = null; + + // Adjust the counters + deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size()); + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + + if (!session.getTransacted()) { + deliveredMessages.clear(); + } + } + } + + void acknowledge(MessageDispatch md) throws JMSException { + MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1); + session.sendAck(ack); + synchronized(deliveredMessages){ + deliveredMessages.remove(md); + } + } + + public void commit() throws JMSException { + synchronized (deliveredMessages) { + deliveredMessages.clear(); + } + redeliveryDelay = 0; + } + + public void rollback() throws JMSException { + synchronized (unconsumedMessages.getMutex()) { + if (optimizeAcknowledge) { + // remove messages read but not acked at the broker yet through + // optimizeAcknowledge + if (!this.info.isBrowser()) { + synchronized(deliveredMessages) { + for (int i = 0; (i < deliveredMessages.size()) && (i < ackCounter); i++) { + // ensure we don't filter this as a duplicate + MessageDispatch md = deliveredMessages.removeLast(); + session.connection.rollbackDuplicate(this, md.getMessage()); + } + } + } + } + synchronized(deliveredMessages) { + if (deliveredMessages.isEmpty()) { + return; + } + + // Only increase the redelivery delay after the first redelivery.. + MessageDispatch lastMd = deliveredMessages.getFirst(); + final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter(); + if (currentRedeliveryCount > 0) { + redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); + } + MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId(); + + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { + MessageDispatch md = iter.next(); + md.getMessage().onMessageRolledBack(); + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, md.getMessage()); + } + + if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { + // We need to NACK the messages so that they get sent to the + // DLQ. + // Acknowledge the last message. + + MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); + ack.setFirstMessageId(firstMsgId); + session.sendAck(ack,true); + // Adjust the window size. + additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); + redeliveryDelay = 0; + } else { + + // only redelivery_ack after first delivery + if (currentRedeliveryCount > 0) { + MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); + ack.setFirstMessageId(firstMsgId); + session.sendAck(ack,true); + } + + // stop the delivery of messages. + unconsumedMessages.stop(); + + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { + MessageDispatch md = iter.next(); + unconsumedMessages.enqueueFirst(md); + } + + if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { + // Start up the delivery again a little later. + scheduler.executeAfterDelay(new Runnable() { + public void run() { + try { + if (started.get()) { + start(); + } + } catch (JMSException e) { + session.connection.onAsyncException(e); + } + } + }, redeliveryDelay); + } else { + start(); + } + + } + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); + } + } + if (messageListener.get() != null) { + session.redispatch(this, unconsumedMessages); + } + } + + public void dispatch(MessageDispatch md) { + MessageListener listener = this.messageListener.get(); + try { + synchronized (unconsumedMessages.getMutex()) { + if (clearDispatchList) { + // we are reconnecting so lets flush the in progress + // messages + clearDispatchList = false; + List list = unconsumedMessages.removeAll(); + if (!this.info.isBrowser()) { + for (MessageDispatch old : list) { + // ensure we don't filter this as a duplicate + session.connection.rollbackDuplicate(this, old.getMessage()); + } + } + if (pendingAck != null && pendingAck.isDeliveredAck()) { + // on resumption a pending delivered ack will be out of sync with + // re deliveries. + if (LOG.isDebugEnabled()) { + LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck); + } + pendingAck = null; + } + } + if (!unconsumedMessages.isClosed()) { + if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { + if (listener != null && unconsumedMessages.isRunning()) { + ActiveMQMessage message = createActiveMQMessage(md); + beforeMessageIsConsumed(md); + try { + boolean expired = message.isExpired(); + if (!expired) { + listener.onMessage(message); + } + afterMessageIsConsumed(md, expired); + } catch (RuntimeException e) { + if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge() || session.isIndividualAcknowledge()) { + // Redeliver the message + } else { + // Transacted or Client ack: Deliver the + // next message. + afterMessageIsConsumed(md, false); + } + LOG.error(getConsumerId() + " Exception while processing message: " + e, e); + } + } else { + unconsumedMessages.enqueue(md); + if (availableListener != null) { + availableListener.onMessageAvailable(this); + } + } + } else { + // ignore duplicate + if (LOG.isDebugEnabled()) { + LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage()); + } + // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in. + // the normal ack will happen in the transaction. + if (session.isTransacted()) { + ackLater(md, MessageAck.DELIVERED_ACK_TYPE); + } else { + acknowledge(md); + } + } + } + } + if (++dispatchedCount % 1000 == 0) { + dispatchedCount = 0; + Thread.yield(); + } + } catch (Exception e) { + session.connection.onClientInternalException(e); + } + } + + public int getMessageSize() { + return unconsumedMessages.size(); + } + + public void start() throws JMSException { + if (unconsumedMessages.isClosed()) { + return; + } + started.set(true); + unconsumedMessages.start(); + session.executor.wakeup(); + } + + public void stop() { + started.set(false); + unconsumedMessages.stop(); + } + + public String toString() { + return "ActiveMQMessageConsumer { value=" + info.getConsumerId() + ", started=" + started.get() + + " }"; + } + + /** + * Delivers a message to the message listener. + * + * @return + * @throws JMSException + */ + public boolean iterate() { + MessageListener listener = this.messageListener.get(); + if (listener != null) { + MessageDispatch md = unconsumedMessages.dequeueNoWait(); + if (md != null) { + try { + ActiveMQMessage message = createActiveMQMessage(md); + beforeMessageIsConsumed(md); + listener.onMessage(message); + afterMessageIsConsumed(md, false); + } catch (JMSException e) { + session.connection.onClientInternalException(e); + } + return true; + } + } + return false; + } + + public boolean isInUse(ActiveMQTempDestination destination) { + return info.getDestination().equals(destination); + } + + public long getLastDeliveredSequenceId() { + return lastDeliveredSequenceId; + } + + public IOException getFailureError() { + return failureError; + } + + public void setFailureError(IOException failureError) { + this.failureError = failureError; + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.management.JMSProducerStatsImpl; +import org.apache.activemq.management.StatsCapable; +import org.apache.activemq.management.StatsImpl; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.util.IntrospectionSupport; + +/** + * A client uses a MessageProducer object to send messages to a + * destination. A MessageProducer object is created by passing a + * Destination object to a message-producer creation method + * supplied by a session. + *

+ * MessageProducer is the parent interface for all message + * producers. + *

+ * A client also has the option of creating a message producer without supplying + * a destination. In this case, a destination must be provided with every send + * operation. A typical use for this kind of message producer is to send replies + * to requests using the request's JMSReplyTo destination. + *

+ * A client can specify a default delivery mode, priority, and time to live for + * messages sent by a message producer. It can also specify the delivery mode, + * priority, and time to live for an individual message. + *

+ * A client can specify a time-to-live value in milliseconds for each message it + * sends. This value defines a message expiration time that is the sum of the + * message's time-to-live and the GMT when it is sent (for transacted sends, + * this is the time the client sends the message, not the time the transaction + * is committed). + *

+ * A JMS provider should do its best to expire messages accurately; however, the + * JMS API does not define the accuracy provided. + * + * @version $Revision: 1.14 $ + * @see javax.jms.TopicPublisher + * @see javax.jms.QueueSender + * @see javax.jms.Session#createProducer + */ +public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable { + + protected ProducerInfo info; + protected boolean closed; + + private JMSProducerStatsImpl stats; + private AtomicLong messageSequence; + private long startTime; + private MessageTransformer transformer; + private MemoryUsage producerWindow; + + protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException { + super(session); + this.info = new ProducerInfo(producerId); + this.info.setWindowSize(session.connection.getProducerWindowSize()); + if (destination != null && destination.getOptions() != null) { + Map options = new HashMap(destination.getOptions()); + IntrospectionSupport.setProperties(this.info, options, "producer."); + } + this.info.setDestination(destination); + + // Enable producer window flow control if protocol > 3 and the window + // size > 0 + if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) { + producerWindow = new MemoryUsage("Producer Window: " + producerId); + producerWindow.setLimit(this.info.getWindowSize()); + producerWindow.start(); + } + + this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE; + this.defaultPriority = Message.DEFAULT_PRIORITY; + this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE; + this.startTime = System.currentTimeMillis(); + this.messageSequence = new AtomicLong(0); + this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination); + this.session.addProducer(this); + this.session.asyncSendPacket(info); + this.setSendTimeout(sendTimeout); + setTransformer(session.getTransformer()); + } + + public StatsImpl getStats() { + return stats; + } + + public JMSProducerStatsImpl getProducerStats() { + return stats; + } + + /** + * Gets the destination associated with this MessageProducer. + * + * @return this producer's Destination/ + * @throws JMSException if the JMS provider fails to close the producer due to + * some internal error. + * @since 1.1 + */ + public Destination getDestination() throws JMSException { + checkClosed(); + return this.info.getDestination(); + } + + /** + * Closes the message producer. + *

+ * Since a provider may allocate some resources on behalf of a + * MessageProducer + * outside the Java virtual machine, clients should close them when they are + * not needed. Relying on garbage collection to eventually reclaim these + * resources may not be timely enough. + * + * @throws JMSException if the JMS provider fails to close the producer due + * to some internal error. + */ + public void close() throws JMSException { + if (!closed) { + dispose(); + this.session.asyncSendPacket(info.createRemoveCommand()); + } + } + + public void dispose() { + if (!closed) { + this.session.removeProducer(this); + if (producerWindow != null) { + producerWindow.stop(); + } + closed = true; + } + } + + /** + * Check if the instance of this producer has been closed. + * + * @throws IllegalStateException + */ + protected void checkClosed() throws IllegalStateException { + if (closed) { + throw new IllegalStateException("The producer is closed"); + } + } + + /** + * Sends a message to a destination for an unidentified message producer, + * specifying delivery mode, priority and time to live. + *

+ * Typically, a message producer is assigned a destination at creation time; + * however, the JMS API also supports unidentified message producers, which + * require that the destination be supplied every time a message is sent. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @throws JMSException if the JMS provider fails to send the message due to + * some internal error. + * @throws UnsupportedOperationException if an invalid destination is + * specified. + * @throws InvalidDestinationException if a client uses this method with an + * invalid destination. + * @see javax.jms.Session#createProducer + * @since 1.1 + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkClosed(); + if (destination == null) { + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + throw new InvalidDestinationException("Don't understand null destinations"); + } + + ActiveMQDestination dest; + if (destination == info.getDestination()) { + dest = (ActiveMQDestination)destination; + } else if (info.getDestination() == null) { + dest = ActiveMQDestination.transform(destination); + } else { + throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); + } + if (dest == null) { + throw new JMSException("No destination specified"); + } + + if (transformer != null) { + Message transformedMessage = transformer.producerTransform(session, this, message); + if (transformedMessage != null) { + message = transformedMessage; + } + } + + if (producerWindow != null) { + try { + producerWindow.waitForSpace(); + } catch (InterruptedException e) { + throw new JMSException("Send aborted due to thread interrupt."); + } + } + + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout); + + stats.onMessage(); + } + + public MessageTransformer getTransformer() { + return transformer; + } + + /** + * Sets the transformer used to transform messages before they are sent on + * to the JMS bus + */ + public void setTransformer(MessageTransformer transformer) { + this.transformer = transformer; + } + + /** + * @return the time in milli second when this object was created. + */ + protected long getStartTime() { + return this.startTime; + } + + /** + * @return Returns the messageSequence. + */ + protected long getMessageSequence() { + return messageSequence.incrementAndGet(); + } + + /** + * @param messageSequence The messageSequence to set. + */ + protected void setMessageSequence(AtomicLong messageSequence) { + this.messageSequence = messageSequence; + } + + /** + * @return Returns the info. + */ + protected ProducerInfo getProducerInfo() { + return this.info != null ? this.info : null; + } + + /** + * @param info The info to set + */ + protected void setProducerInfo(ProducerInfo info) { + this.info = info; + } + + public String toString() { + return "ActiveMQMessageProducer { value=" + info.getProducerId() + " }"; + } + + public void onProducerAck(ProducerAck pa) { + if (this.producerWindow != null) { + this.producerWindow.decreaseUsage(pa.getSize()); + } + } + +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + +/** + * A useful base class for implementing a {@link MessageProducer} + * + * @version $Revision: $ + */ +public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable { + protected ActiveMQSession session; + protected boolean disableMessageID; + protected boolean disableMessageTimestamp; + protected int defaultDeliveryMode; + protected int defaultPriority; + protected long defaultTimeToLive; + protected int sendTimeout=0; + + public ActiveMQMessageProducerSupport(ActiveMQSession session) { + this.session = session; + disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault(); + } + + /** + * Sets whether message IDs are disabled. + *

+ * Since message IDs take some effort to create and increase a message's + * size, some JMS providers may be able to optimize message overhead if + * they are given a hint that the message ID is not used by an application. + * By calling the setDisableMessageID method on this message + * producer, a JMS client enables this potential optimization for all + * messages sent by this message producer. If the JMS provider accepts this + * hint, these messages must have the message ID set to null; if the + * provider ignores the hint, the message ID must be set to its normal + * unique value. + *

+ * Message IDs are enabled by default. + * + * @param value indicates if message IDs are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public void setDisableMessageID(boolean value) throws JMSException { + checkClosed(); + this.disableMessageID = value; + } + + /** + * Gets an indication of whether message IDs are disabled. + * + * @return an indication of whether message IDs are disabled + * @throws javax.jms.JMSException if the JMS provider fails to determine if message IDs are + * disabled due to some internal error. + */ + public boolean getDisableMessageID() throws JMSException { + checkClosed(); + return this.disableMessageID; + } + + /** + * Sets whether message timestamps are disabled. + *

+ * Since timestamps take some effort to create and increase a message's + * size, some JMS providers may be able to optimize message overhead if + * they are given a hint that the timestamp is not used by an application. + * By calling the setDisableMessageTimestamp method on this + * message producer, a JMS client enables this potential optimization for + * all messages sent by this message producer. If the JMS provider accepts + * this hint, these messages must have the timestamp set to zero; if the + * provider ignores the hint, the timestamp must be set to its normal + * value. + *

+ * Message timestamps are enabled by default. + * + * @param value indicates if message timestamps are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException { + checkClosed(); + this.disableMessageTimestamp = value; + } + + /** + * Gets an indication of whether message timestamps are disabled. + * + * @return an indication of whether message timestamps are disabled + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public boolean getDisableMessageTimestamp() throws JMSException { + checkClosed(); + return this.disableMessageTimestamp; + } + + /** + * Sets the producer's default delivery mode. + *

+ * Delivery mode is set to PERSISTENT by default. + * + * @param newDeliveryMode the message delivery mode for this message producer; legal + * values are DeliveryMode.NON_PERSISTENT and + * DeliveryMode.PERSISTENT + * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to + * some internal error. + * @see javax.jms.MessageProducer#getDeliveryMode + * @see javax.jms.DeliveryMode#NON_PERSISTENT + * @see javax.jms.DeliveryMode#PERSISTENT + * @see javax.jms.Message#DEFAULT_DELIVERY_MODE + */ + public void setDeliveryMode(int newDeliveryMode) throws JMSException { + if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT) { + throw new javax.jms.IllegalStateException("unkown delivery mode: " + newDeliveryMode); + } + checkClosed(); + this.defaultDeliveryMode = newDeliveryMode; + } + + /** + * Gets the producer's default delivery mode. + * + * @return the message delivery mode for this message producer + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + */ + public int getDeliveryMode() throws JMSException { + checkClosed(); + return this.defaultDeliveryMode; + } + + /** + * Sets the producer's default priority. + *

+ * The JMS API defines ten levels of priority value, with 0 as the lowest + * priority and 9 as the highest. Clients should consider priorities 0-4 as + * gradations of normal priority and priorities 5-9 as gradations of + * expedited priority. Priority is set to 4 by default. + * + * @param newDefaultPriority the message priority for this message producer; must be a + * value between 0 and 9 + * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode due to + * some internal error. + * @see javax.jms.MessageProducer#getPriority + * @see javax.jms.Message#DEFAULT_PRIORITY + */ + public void setPriority(int newDefaultPriority) throws JMSException { + if (newDefaultPriority < 0 || newDefaultPriority > 9) { + throw new IllegalStateException("default priority must be a value between 0 and 9"); + } + checkClosed(); + this.defaultPriority = newDefaultPriority; + } + + /** + * Gets the producer's default priority. + * + * @return the message priority for this message producer + * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to + * some internal error. + * @see javax.jms.MessageProducer#setPriority + */ + public int getPriority() throws JMSException { + checkClosed(); + return this.defaultPriority; + } + + /** + * Sets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + *

+ * Time to live is set to zero by default. + * + * @param timeToLive the message time to live in milliseconds; zero is unlimited + * @throws javax.jms.JMSException if the JMS provider fails to set the time to live due to + * some internal error. + * @see javax.jms.MessageProducer#getTimeToLive + * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE + */ + public void setTimeToLive(long timeToLive) throws JMSException { + if (timeToLive < 0L) { + throw new IllegalStateException("cannot set a negative timeToLive"); + } + checkClosed(); + this.defaultTimeToLive = timeToLive; + } + + /** + * Gets the default length of time in milliseconds from its dispatch time + * that a produced message should be retained by the message system. + * + * @return the message time to live in milliseconds; zero is unlimited + * @throws javax.jms.JMSException if the JMS provider fails to get the time to live due to + * some internal error. + * @see javax.jms.MessageProducer#setTimeToLive + */ + public long getTimeToLive() throws JMSException { + checkClosed(); + return this.defaultTimeToLive; + } + + /** + * Sends a message using the MessageProducer's default + * delivery mode, priority, and time to live. + * + * @param message the message to send + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with a + * MessageProducer with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that did not specify a + * destination at creation time. + * @see javax.jms.Session#createProducer + * @see javax.jms.MessageProducer + * @since 1.1 + */ + public void send(Message message) throws JMSException { + this.send(this.getDestination(), + message, + this.defaultDeliveryMode, + this.defaultPriority, + this.defaultTimeToLive); + } + + /** + * Sends a message to the destination, specifying delivery mode, priority, + * and time to live. + * + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with a + * MessageProducer with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that did not specify a + * destination at creation time. + * @see javax.jms.Session#createProducer + * @since 1.1 + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + this.send(this.getDestination(), + message, + deliveryMode, + priority, + timeToLive); + } + + /** + * Sends a message to a destination for an unidentified message producer. + * Uses the MessageProducer's default delivery mode, + * priority, and time to live. + *

+ * Typically, a message producer is assigned a destination at creation + * time; however, the JMS API also supports unidentified message producers, + * which require that the destination be supplied every time a message is + * sent. + * + * @param destination the destination to send this message to + * @param message the message to send + * @throws javax.jms.JMSException if the JMS provider fails to send the message due to some + * internal error. + * @throws javax.jms.MessageFormatException if an invalid message is specified. + * @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid destination. + * @throws UnsupportedOperationException + * if a client uses this method with a + * MessageProducer that specified a destination at + * creation time. + * @see javax.jms.Session#createProducer + * @see javax.jms.MessageProducer + */ + public void send(Destination destination, Message message) throws JMSException { + this.send(destination, + message, + this.defaultDeliveryMode, + this.defaultPriority, + this.defaultTimeToLive); + } + + + protected abstract void checkClosed() throws IllegalStateException; + + /** + * @return the sendTimeout + */ + public int getSendTimeout() { + return sendTimeout; + } + + /** + * @param sendTimeout the sendTimeout to set + */ + public void setSendTimeout(int sendTimeout) { + this.sendTimeout = sendTimeout; + } +} Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Enumeration; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * A helper class for converting normal JMS interfaces into ActiveMQ specific + * ones. + * + * @version $Revision: 1.1 $ + */ +public final class ActiveMQMessageTransformation { + + private ActiveMQMessageTransformation() { + } + + /** + * Creates a an available JMS message from another provider. + * + * @param destination - Destination to be converted into ActiveMQ's + * implementation. + * @return ActiveMQDestination - ActiveMQ's implementation of the + * destination. + * @throws JMSException if an error occurs + */ + public static ActiveMQDestination transformDestination(Destination destination) throws JMSException { + ActiveMQDestination activeMQDestination = null; + + if (destination != null) { + if (destination instanceof ActiveMQDestination) { + return (ActiveMQDestination)destination; + + } else { + if (destination instanceof TemporaryQueue) { + activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName()); + } else if (destination instanceof TemporaryTopic) { + activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName()); + } else if (destination instanceof Queue) { + activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName()); + } else if (destination instanceof Topic) { + activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName()); + } + } + } + + return activeMQDestination; + } + + /** + * Creates a fast shallow copy of the current ActiveMQMessage or creates a + * whole new message instance from an available JMS message from another + * provider. + * + * @param message - Message to be converted into ActiveMQ's implementation. + * @param connection + * @return ActiveMQMessage - ActiveMQ's implementation object of the + * message. + * @throws JMSException if an error occurs + */ + public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection) + throws JMSException { + if (message instanceof ActiveMQMessage) { + return (ActiveMQMessage)message; + + } else { + ActiveMQMessage activeMessage = null; + + if (message instanceof BytesMessage) { + BytesMessage bytesMsg = (BytesMessage)message; + bytesMsg.reset(); + ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); + msg.setConnection(connection); + try { + for (;;) { + // Reads a byte from the message stream until the stream + // is empty + msg.writeByte(bytesMsg.readByte()); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + + activeMessage = msg; + } else if (message instanceof MapMessage) { + MapMessage mapMsg = (MapMessage)message; + ActiveMQMapMessage msg = new ActiveMQMapMessage(); + msg.setConnection(connection); + Enumeration iter = mapMsg.getMapNames(); + + while (iter.hasMoreElements()) { + String name = iter.nextElement().toString(); + msg.setObject(name, mapMsg.getObject(name)); + } + + activeMessage = msg; + } else if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage)message; + ActiveMQObjectMessage msg = new ActiveMQObjectMessage(); + msg.setConnection(connection); + msg.setObject(objMsg.getObject()); + msg.storeContent(); + activeMessage = msg; + } else if (message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage)message; + streamMessage.reset(); + ActiveMQStreamMessage msg = new ActiveMQStreamMessage(); + msg.setConnection(connection); + Object obj = null; + + try { + while ((obj = streamMessage.readObject()) != null) { + msg.writeObject(obj); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + + activeMessage = msg; + } else if (message instanceof TextMessage) { + TextMessage textMsg = (TextMessage)message; + ActiveMQTextMessage msg = new ActiveMQTextMessage(); + msg.setConnection(connection); + msg.setText(textMsg.getText()); + activeMessage = msg; + } else { + activeMessage = new ActiveMQMessage(); + activeMessage.setConnection(connection); + } + + copyProperties(message, activeMessage); + + return activeMessage; + } + } + + /** + * Copies the standard JMS and user defined properties from the givem + * message to the specified message + * + * @param fromMessage the message to take the properties from + * @param toMessage the message to add the properties to + * @throws JMSException + */ + public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { + toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); + toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); + toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); + toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); + toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); + toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); + toMessage.setJMSType(fromMessage.getJMSType()); + toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); + toMessage.setJMSPriority(fromMessage.getJMSPriority()); + toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); + + Enumeration propertyNames = fromMessage.getPropertyNames(); + + while (propertyNames.hasMoreElements()) { + String name = propertyNames.nextElement().toString(); + Object obj = fromMessage.getObjectProperty(name); + toMessage.setObjectProperty(name, obj); + } + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java ------------------------------------------------------------------------------ svn:executable = *