activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1183143 - in /activemq/trunk: activemq-camel/src/test/java/org/apache/activemq/camel/component/ activemq-core/src/main/java/org/apache/activemq/
Date Thu, 13 Oct 2011 22:28:47 GMT
Author: tabish
Date: Thu Oct 13 22:28:47 2011
New Revision: 1183143

URL: http://svn.apache.org/viewvc?rev=1183143&view=rev
Log:
apply fix for: https://issues.apache.org/jira/browse/AMQ-3500

Modified:
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java

Modified: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java?rev=1183143&r1=1183142&r2=1183143&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
(original)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/component/ActiveMQRouteTest.java
Thu Oct 13 22:28:47 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.camel.component;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 
@@ -28,7 +30,7 @@ import org.springframework.jms.connectio
 import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
 
 /**
- * 
+ *
  */
 public class ActiveMQRouteTest extends CamelTestSupport {
     protected MockEndpoint resultEndpoint;
@@ -67,6 +69,36 @@ public class ActiveMQRouteTest extends C
         return camelContext;
     }
 
+    @Test
+    public void testInvalidDestinationOptionOnConsumer() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        assertMockEndpointsSatisfied(1, TimeUnit.SECONDS);
+        try {
+            new RouteBuilder() {
+                public void configure() throws Exception {
+                    from("activemq:queue:foo?destination.consumer.exclusive=true&destination.consumer.unknown=foo")
+                        .to("mock:result");
+                }
+            };
+        } catch (Exception e) {
+            fail("Should not have accepted bad destination options.");
+        }
+    }
+
+    @Test
+    public void testInvalidDestinationOptionOnProducer() throws Exception {
+        try {
+            new RouteBuilder() {
+                public void configure() throws Exception {
+                    from("activemq:queue:foo")
+                        .to("activemq:queue:bar?destination.producer.exclusive=true");
+                }
+            };
+        } catch (Exception e) {
+            fail("Should not have accepted bad destination options.");
+        }
+    }
+
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1183143&r1=1183142&r2=1183143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Oct 13 22:28:47 2011
@@ -91,8 +91,8 @@ import org.slf4j.LoggerFactory;
  * <P>
  * It is a client programming error for a <CODE>MessageListener</CODE> to
  * throw an exception.
- * 
- * 
+ *
+ *
  * @see javax.jms.MessageConsumer
  * @see javax.jms.QueueReceiver
  * @see javax.jms.TopicSubscriber
@@ -148,7 +148,7 @@ public class ActiveMQMessageConsumer imp
     private long lastDeliveredSequenceId;
 
     private IOException failureError;
-    
+
     private long optimizeAckTimestamp = System.currentTimeMillis();
     private long optimizeAcknowledgeTimeOut = 0;
     private long failoverRedeliveryWaitPeriod = 0;
@@ -156,7 +156,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Create a MessageConsumer
-     * 
+     *
      * @param session
      * @param dest
      * @param name
@@ -225,6 +225,15 @@ public class ActiveMQMessageConsumer imp
         if (dest.getOptions() != null) {
             Map<String, String> options = new HashMap<String, String>(dest.getOptions());
             IntrospectionSupport.setProperties(this.info, options, "consumer.");
+            if (options.size() > 0) {
+                String msg = "There are " + options.size()
+                    + " consumer options that couldn't be set on the consumer."
+                    + " Check the options are spelled correctly."
+                    + " Unknown parameters=[" + options + "]."
+                    + " This consumer cannot be started.";
+                LOG.warn(msg);
+                throw new ConfigurationException(msg);
+            }
         }
 
         this.info.setDestination(dest);
@@ -329,7 +338,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Retrieve is a browser
-     * 
+     *
      * @return true if a browser
      */
     protected boolean isBrowser() {
@@ -359,7 +368,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Gets this message consumer's message selector expression.
-     * 
+     *
      * @return this message consumer's message selector, or null if no message
      *         selector exists for the message consumer (that is, if the message
      *         selector was not set or was set to null or the empty string)
@@ -373,7 +382,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Gets the message consumer's <CODE>MessageListener</CODE>.
-     * 
+     *
      * @return the listener for the message consumer, or null if no listener is
      *         set
      * @throws JMSException if the JMS provider fails to get the message
@@ -394,7 +403,7 @@ public class ActiveMQMessageConsumer imp
      * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE>
      * while messages are being consumed by an existing listener or the consumer
      * is being used to consume messages synchronously is undefined.
-     * 
+     *
      * @param listener the listener to which the messages are to be delivered
      * @throws JMSException if the JMS provider fails to receive the next
      *                 message due to some internal error.
@@ -443,7 +452,7 @@ public class ActiveMQMessageConsumer imp
      * then it it tries to not block at all, it returns a message if it is
      * available - if timeout>0 then it blocks up to timeout amount of time.
      * Expired messages will consumed by this method.
-     * 
+     *
      * @throws JMSException
      * @return null if we timeout or if the consumer is closed.
      */
@@ -459,11 +468,11 @@ public class ActiveMQMessageConsumer imp
                     if (timeout > 0 && !unconsumedMessages.isClosed()) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     } else {
-                    	if (failureError != null) {
-                    		throw JMSExceptionSupport.create(failureError);
-                    	} else {
-                    		return null;
-                    	}
+                        if (failureError != null) {
+                            throw JMSExceptionSupport.create(failureError);
+                        } else {
+                            return null;
+                        }
                     }
                 } else if (md.getMessage() == null) {
                     return null;
@@ -497,7 +506,7 @@ public class ActiveMQMessageConsumer imp
      * <P>
      * If this <CODE>receive</CODE> is done within a transaction, the consumer
      * retains the message until the transaction commits.
-     * 
+     *
      * @return the next message produced for this message consumer, or null if
      *         this message consumer is concurrently closed
      */
@@ -524,7 +533,7 @@ public class ActiveMQMessageConsumer imp
     private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException
{
         ActiveMQMessage m = (ActiveMQMessage)md.getMessage().copy();
         if (m.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
-        	((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
+            ((ActiveMQBlobMessage)m).setBlobDownloader(new BlobDownloader(session.getBlobTransferPolicy()));
         }
         if (transformer != null) {
             Message transformedMessage = transformer.consumerTransform(session, this, m);
@@ -557,7 +566,7 @@ public class ActiveMQMessageConsumer imp
      * This call blocks until a message arrives, the timeout expires, or this
      * message consumer is closed. A <CODE>timeout</CODE> of zero never
      * expires, and the call blocks indefinitely.
-     * 
+     *
      * @param timeout the timeout value (in milliseconds), a time out of zero
      *                never expires.
      * @return the next message produced for this message consumer, or null if
@@ -595,7 +604,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Receives the next message if one is immediately available.
-     * 
+     *
      * @return the next message produced for this message consumer, or null if
      *         one is not available
      * @throws JMSException if the JMS provider fails to receive the next
@@ -635,7 +644,7 @@ public class ActiveMQMessageConsumer imp
      * This call blocks until a <CODE>receive</CODE> or message listener in
      * progress has completed. A blocked message consumer <CODE>receive </CODE>
      * call returns null when this message consumer is closed.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to close the consumer due
      *                 to some internal error.
      */
@@ -655,7 +664,7 @@ public class ActiveMQMessageConsumer imp
                 });
             } else {
                 doClose();
-            } 
+            }
         }
     }
 
@@ -668,13 +677,13 @@ public class ActiveMQMessageConsumer imp
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
     }
-    
+
     void inProgressClearRequired() {
         inProgressClearRequiredFlag = true;
         // deal with delivered messages async to avoid lock contention with in progress acks
         clearDispatchList = true;
     }
-    
+
     void clearMessagesInProgress() {
         if (inProgressClearRequiredFlag) {
             synchronized (unconsumedMessages.getMutex()) {
@@ -706,18 +715,18 @@ public class ActiveMQMessageConsumer imp
                     if (ack != null) {
                         deliveredMessages.clear();
                         ackCounter = 0;
-            		} else {
-            		    ack = pendingAck;
-            		    pendingAck = null;
-            		}
-            	}
+                    } else {
+                        ack = pendingAck;
+                        pendingAck = null;
+                    }
+                }
             } else if (pendingAck != null && pendingAck.isStandardAck()) {
                 ack = pendingAck;
                 pendingAck = null;
             }
             if (ack != null) {
                 final MessageAck ackToSend = ack;
-                
+
                 if (executorService == null) {
                     executorService = Executors.newSingleThreadExecutor();
                 }
@@ -740,10 +749,10 @@ public class ActiveMQMessageConsumer imp
 
     public void dispose() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
-            
+
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now.
-            if (!session.getTransacted()) { 
+            if (!session.getTransacted()) {
                 deliverAcks();
                 if (isAutoAcknowledgeBatch()) {
                     acknowledge();
@@ -757,7 +766,7 @@ public class ActiveMQMessageConsumer imp
                     Thread.currentThread().interrupt();
                 }
             }
-            
+
             if (session.isClientAcknowledge()) {
                 if (!this.info.isBrowser()) {
                     // rollback duplicates that aren't acknowledged
@@ -874,13 +883,13 @@ public class ActiveMQMessageConsumer imp
                             if (optimizeAcknowledge) {
                                 ackCounter++;
                                 if (ackCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut
> 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut)))
{
-                                	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
-                                	if (ack != null) {
-                            		    deliveredMessages.clear();
-                            		    ackCounter = 0;
-                            		    session.sendAck(ack);
-                            		    optimizeAckTimestamp = System.currentTimeMillis();
-                                	}
+                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
+                                    if (ack != null) {
+                                        deliveredMessages.clear();
+                                        ackCounter = 0;
+                                        session.sendAck(ack);
+                                        optimizeAckTimestamp = System.currentTimeMillis();
+                                    }
                                 }
                             } else {
                                 MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -903,7 +912,7 @@ public class ActiveMQMessageConsumer imp
                 if (messageUnackedByConsumer) {
                     ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                 }
-            } 
+            }
             else {
                 throw new IllegalStateException("Invalid session state.");
             }
@@ -913,21 +922,21 @@ public class ActiveMQMessageConsumer imp
     /**
      * Creates a MessageAck for all messages contained in deliveredMessages.
      * Caller should hold the lock for deliveredMessages.
-     * 
-     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE) 
+     *
+     * @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
      * @return <code>null</code> if nothing to ack.
      */
-	private MessageAck makeAckForAllDeliveredMessages(byte type) {
-		synchronized (deliveredMessages) {
-			if (deliveredMessages.isEmpty())
-				return null;
-			    
-			MessageDispatch md = deliveredMessages.getFirst();
-		    MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
-		    ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
-		    return ack;
-		}
-	}
+    private MessageAck makeAckForAllDeliveredMessages(byte type) {
+        synchronized (deliveredMessages) {
+            if (deliveredMessages.isEmpty())
+                return null;
+
+            MessageDispatch md = deliveredMessages.getFirst();
+            MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
+            ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
+            return ack;
+        }
+    }
 
     private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
 
@@ -938,7 +947,7 @@ public class ActiveMQMessageConsumer imp
         }
 
         deliveredCounter++;
-        
+
         MessageAck oldPendingAck = pendingAck;
         pendingAck = new MessageAck(md, ackType, deliveredCounter);
         pendingAck.setTransactionId(session.getTransactionContext().getTransactionId());
@@ -960,7 +969,7 @@ public class ActiveMQMessageConsumer imp
                 }
             }
         }
-        
+
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize))
{
             session.sendAck(pendingAck);
             pendingAck=null;
@@ -1006,7 +1015,7 @@ public class ActiveMQMessageConsumer imp
     /**
      * Acknowledge all the messages that have been delivered to the client up to
      * this point.
-     * 
+     *
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
@@ -1016,8 +1025,8 @@ public class ActiveMQMessageConsumer imp
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
             if (ack == null)
-            	return; // no msgs
-            
+                return; // no msgs
+
             if (session.getTransacted()) {
                 rollbackOnFailedRecoveryRedelivery();
                 session.doStartTransaction();
@@ -1025,17 +1034,17 @@ public class ActiveMQMessageConsumer imp
             }
             session.sendAck(ack);
             pendingAck = null;
-            
+
             // Adjust the counters
             deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
             additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
-            
-            if (!session.getTransacted()) {  
+
+            if (!session.getTransacted()) {
                 deliveredMessages.clear();
-            } 
+            }
         }
     }
-    
+
     private void waitForRedeliveries() {
         if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages !=
null) {
             long expiry = System.currentTimeMillis() + failoverRedeliveryWaitPeriod;
@@ -1043,7 +1052,7 @@ public class ActiveMQMessageConsumer imp
             do {
                 numberNotReplayed = 0;
                 synchronized(deliveredMessages) {
-                    if (previouslyDeliveredMessages != null) { 
+                    if (previouslyDeliveredMessages != null) {
                         for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
                             if (!entry.getValue()) {
                                 numberNotReplayed++;
@@ -1077,17 +1086,17 @@ public class ActiveMQMessageConsumer imp
                     numberNotReplayed++;
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("previously delivered message has not been replayed in
transaction: "
-                                + previouslyDeliveredMessages.transactionId 
+                                + previouslyDeliveredMessages.transactionId
                                 + " , messageId: " + entry.getKey());
                     }
                 }
             }
             if (numberNotReplayed > 0) {
-                String message = "rolling back transaction (" 
+                String message = "rolling back transaction ("
                     + previouslyDeliveredMessages.transactionId + ") post failover recovery.
" + numberNotReplayed
                     + " previously delivered message(s) not replayed to consumer: " + this.getConsumerId();
                 LOG.warn(message);
-                throw new TransactionRolledBackException(message);   
+                throw new TransactionRolledBackException(message);
             }
         }
     }
@@ -1128,7 +1137,7 @@ public class ActiveMQMessageConsumer imp
                 if (deliveredMessages.isEmpty()) {
                     return;
                 }
-    
+
                 // use initial delay for first redelivery
                 MessageDispatch lastMd = deliveredMessages.getFirst();
                 final int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
@@ -1138,44 +1147,44 @@ public class ActiveMQMessageConsumer imp
                     redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
                 }
                 MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
-    
+
                 for (Iterator<MessageDispatch> iter = deliveredMessages.iterator();
iter.hasNext();) {
                     MessageDispatch md = iter.next();
                     md.getMessage().onMessageRolledBack();
                     // ensure we don't filter this as a duplicate
                     session.connection.rollbackDuplicate(this, md.getMessage());
                 }
-    
+
                 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
                     && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries())
{
                     // We need to NACK the messages so that they get sent to the
                     // DLQ.
                     // Acknowledge the last message.
-                    
+
                     MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
                     ack.setPoisonCause(lastMd.getRollbackCause());
-					ack.setFirstMessageId(firstMsgId);
+                    ack.setFirstMessageId(firstMsgId);
                     session.sendAck(ack,true);
                     // Adjust the window size.
                     additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
                     redeliveryDelay = 0;
                 } else {
-                    
+
                     // only redelivery_ack after first delivery
                     if (currentRedeliveryCount > 0) {
                         MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE,
deliveredMessages.size());
                         ack.setFirstMessageId(firstMsgId);
                         session.sendAck(ack,true);
                     }
-    
+
                     // stop the delivery of messages.
                     unconsumedMessages.stop();
-    
+
                     for (Iterator<MessageDispatch> iter = deliveredMessages.iterator();
iter.hasNext();) {
                         MessageDispatch md = iter.next();
                         unconsumedMessages.enqueueFirst(md);
                     }
-    
+
                     if (redeliveryDelay > 0 && !unconsumedMessages.isClosed())
{
                         // Start up the delivery again a little later.
                         scheduler.executeAfterDelay(new Runnable() {
@@ -1192,7 +1201,7 @@ public class ActiveMQMessageConsumer imp
                     } else {
                         start();
                     }
-    
+
                 }
                 deliveredCounter -= deliveredMessages.size();
                 deliveredMessages.clear();
@@ -1205,13 +1214,13 @@ public class ActiveMQMessageConsumer imp
 
     /*
      * called with unconsumedMessages && deliveredMessages locked
-     * remove any message not re-delivered as they can't be replayed to this 
+     * remove any message not re-delivered as they can't be replayed to this
      * consumer on rollback
      */
     private void rollbackPreviouslyDeliveredAndNotRedelivered() {
         if (previouslyDeliveredMessages != null) {
             for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
-                if (!entry.getValue()) {              
+                if (!entry.getValue()) {
                     removeFromDeliveredMessages(entry.getKey());
                 }
             }
@@ -1305,7 +1314,7 @@ public class ActiveMQMessageConsumer imp
                             }
                             if (needsPoisonAck) {
                                 LOG.warn("acking duplicate delivery as poison, redelivery
must be pending to another"
-                                        + " consumer on this connection, failoverRedeliveryWaitPeriod="

+                                        + " consumer on this connection, failoverRedeliveryWaitPeriod="
                                         + failoverRedeliveryWaitPeriod + ". Message: " +
md);
                                 MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE,
1);
                                 poisonAck.setFirstMessageId(md.getMessage().getMessageId());
@@ -1333,10 +1342,10 @@ public class ActiveMQMessageConsumer imp
     // async (on next call) clear or track delivered as they may be flagged as duplicates
if they arrive again
     private void clearDispatchList() {
         if (clearDispatchList) {
-            synchronized (deliveredMessages) {  
+            synchronized (deliveredMessages) {
                 if (clearDispatchList) {
                     if (!deliveredMessages.isEmpty()) {
-                        if (session.isTransacted()) {    
+                        if (session.isTransacted()) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(getConsumerId() + " tracking existing transacted
delivered list (" + deliveredMessages.size() + ") on transport interrupt");
                             }
@@ -1386,7 +1395,7 @@ public class ActiveMQMessageConsumer imp
 
     /**
      * Delivers a message to the message listener.
-     * 
+     *
      * @return
      * @throws JMSException
      */
@@ -1410,11 +1419,11 @@ public class ActiveMQMessageConsumer imp
         return lastDeliveredSequenceId;
     }
 
-	public IOException getFailureError() {
-		return failureError;
-	}
-
-	public void setFailureError(IOException failureError) {
-		this.failureError = failureError;
-	}
+    public IOException getFailureError() {
+        return failureError;
+    }
+
+    public void setFailureError(IOException failureError) {
+        this.failureError = failureError;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=1183143&r1=1183142&r2=1183143&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Thu Oct 13 22:28:47 2011
@@ -33,6 +33,8 @@ import org.apache.activemq.management.St
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
@@ -60,14 +62,16 @@ import org.apache.activemq.util.Introspe
  * <P>
  * A JMS provider should do its best to expire messages accurately; however, the
  * JMS API does not define the accuracy provided.
- * 
- * 
+ *
+ *
  * @see javax.jms.TopicPublisher
  * @see javax.jms.QueueSender
  * @see javax.jms.Session#createProducer
  */
 public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable,
Disposable {
 
+    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageProducer.class);
+
     protected ProducerInfo info;
     protected boolean closed;
 
@@ -81,10 +85,21 @@ public class ActiveMQMessageProducer ext
         super(session);
         this.info = new ProducerInfo(producerId);
         this.info.setWindowSize(session.connection.getProducerWindowSize());
+        // Allows the options on the destination to configure the producerInfo
         if (destination != null && destination.getOptions() != null) {
             Map<String, String> options = new HashMap<String, String>(destination.getOptions());
             IntrospectionSupport.setProperties(this.info, options, "producer.");
+            if (options.size() > 0) {
+                String msg = "There are " + options.size()
+                    + " producer options that couldn't be set on the producer."
+                    + " Check the options are spelled correctly."
+                    + " Unknown parameters=[" + options + "]."
+                    + " This producer cannot be started.";
+                LOG.warn(msg);
+                throw new ConfigurationException(msg);
+            }
         }
+
         this.info.setDestination(destination);
 
         // Enable producer window flow control if protocol > 3 and the window
@@ -118,7 +133,7 @@ public class ActiveMQMessageProducer ext
 
     /**
      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
-     * 
+     *
      * @return this producer's <CODE>Destination/ <CODE>
      * @throws JMSException if the JMS provider fails to close the producer due to
      *                      some internal error.
@@ -137,7 +152,7 @@ public class ActiveMQMessageProducer ext
      * outside the Java virtual machine, clients should close them when they are
      * not needed. Relying on garbage collection to eventually reclaim these
      * resources may not be timely enough.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to close the producer due
      *                 to some internal error.
      */
@@ -160,7 +175,7 @@ public class ActiveMQMessageProducer ext
 
     /**
      * Check if the instance of this producer has been closed.
-     * 
+     *
      * @throws IllegalStateException
      */
     @Override
@@ -177,7 +192,7 @@ public class ActiveMQMessageProducer ext
      * Typically, a message producer is assigned a destination at creation time;
      * however, the JMS API also supports unidentified message producers, which
      * require that the destination be supplied every time a message is sent.
-     * 
+     *
      * @param destination the destination to send this message to
      * @param message the message to send
      * @param deliveryMode the delivery mode to use



Mime
View raw message