activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r487232 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Date Thu, 14 Dec 2006 15:18:26 GMT
Author: rajdavies
Date: Thu Dec 14 07:18:25 2006
New Revision: 487232

URL: http://svn.apache.org/viewvc?view=rev&rev=487232
Log:
synchronize around the send on a Message

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=487232&r1=487231&r2=487232
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Dec 14 07:18:25 2006
@@ -199,6 +199,7 @@
     protected boolean closed;
     protected boolean asyncDispatch;
     protected boolean sessionAsyncDispatch;
+    protected Object sendMutex = new Object();
 
     /**
      * Construct the Session
@@ -1493,92 +1494,87 @@
      *            message expiration.
      * @throws JMSException
      */
-    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination,
Message message, int deliveryMode,
-            int priority, long timeToLive) throws JMSException {
-        checkClosed();
-
-        if( destination.isTemporary() && connection.isDeleted(destination) ) {
-            throw new JMSException("Cannot publish to a deleted Destination: "+destination);
-        }
-
-        // tell the Broker we are about to start a new transaction
-        doStartTransaction();
-        TransactionId txid = transactionContext.getTransactionId();
-        
-        message.setJMSDestination(destination);
-        message.setJMSDeliveryMode(deliveryMode);        
-        long expiration = 0L;
-
-        if (!producer.getDisableMessageTimestamp()) {
-            long timeStamp = System.currentTimeMillis();
-            message.setJMSTimestamp(timeStamp);
-            if (timeToLive > 0) {
-                expiration = timeToLive + timeStamp;
-            }
-        }
-
-        message.setJMSExpiration(expiration);
-        message.setJMSPriority(priority);
-        long sequenceNumber = producer.getMessageSequence();
-        
-        message.setJMSRedelivered(false);        
-
-        // transform to our own message format here
-        ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
-        // Set the message id.
-        if( msg == message ) {
-            msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)
);
-        } else {
-            msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)
);
-            message.setJMSMessageID(msg.getMessageId().toString());
-        }
-        
-        msg.setTransactionId(txid);
-
-        if ( connection.isCopyMessageOnSend() ){
-            msg = (ActiveMQMessage) msg.copy();
-        }        
-        msg.onSend();
-        msg.setProducerId(msg.getMessageId().getProducerId());
-
-        if (log.isDebugEnabled()) {
-            log.debug("Sending message: " + msg);
-        }
-
-        if(!msg.isPersistent() || connection.isUseAsyncSend() || txid!=null) {
-            this.connection.asyncSendPacket(msg);
-        } else {
-            this.connection.syncSendPacket(msg);
-        }
-
-    }
-
-    /**
-     * Send TransactionInfo to indicate transaction has started
-     * 
-     * @throws JMSException
-     *             if some internal error occurs
-     */
-    protected void doStartTransaction() throws JMSException {
-        if (getTransacted() && !transactionContext.isInXATransaction()) {
-            transactionContext.begin();
-        }
-    }
-
-    /**
-     * Checks whether the session has unconsumed messages.
-     * 
-     * @return true - if there are unconsumed messages.
-     */
+    protected void send(ActiveMQMessageProducer producer,
+	        ActiveMQDestination destination,Message message,int deliveryMode,
+	        int priority,long timeToLive) throws JMSException{
+		checkClosed();
+		if(destination.isTemporary()&&connection.isDeleted(destination)){
+			throw new JMSException("Cannot publish to a deleted Destination: "
+			        +destination);
+		}
+		synchronized(sendMutex){
+			// tell the Broker we are about to start a new transaction
+			doStartTransaction();
+			TransactionId txid=transactionContext.getTransactionId();
+			message.setJMSDestination(destination);
+			message.setJMSDeliveryMode(deliveryMode);
+			long expiration=0L;
+			if(!producer.getDisableMessageTimestamp()){
+				long timeStamp=System.currentTimeMillis();
+				message.setJMSTimestamp(timeStamp);
+				if(timeToLive>0){
+					expiration=timeToLive+timeStamp;
+				}
+			}
+			message.setJMSExpiration(expiration);
+			message.setJMSPriority(priority);
+			long sequenceNumber=producer.getMessageSequence();
+			message.setJMSRedelivered(false);
+			// transform to our own message format here
+			ActiveMQMessage msg=ActiveMQMessageTransformation.transformMessage(
+			        message,connection);
+			// Set the message id.
+			if(msg==message){
+				msg.setMessageId(new MessageId(producer.getProducerInfo()
+				        .getProducerId(),sequenceNumber));
+			}else{
+				msg.setMessageId(new MessageId(producer.getProducerInfo()
+				        .getProducerId(),sequenceNumber));
+				message.setJMSMessageID(msg.getMessageId().toString());
+			}
+			msg.setTransactionId(txid);
+			if(connection.isCopyMessageOnSend()){
+				msg=(ActiveMQMessage)msg.copy();
+			}
+			msg.onSend();
+			msg.setProducerId(msg.getMessageId().getProducerId());
+			if(log.isDebugEnabled()){
+				log.debug("Sending message: "+msg);
+			}
+			if(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null){
+				this.connection.asyncSendPacket(msg);
+			}else{
+				this.connection.syncSendPacket(msg);
+			}
+		}
+	}
+
+	/**
+	 * Send TransactionInfo to indicate transaction has started
+	 * 
+	 * @throws JMSException
+	 *             if some internal error occurs
+	 */
+	protected void doStartTransaction() throws JMSException{
+		if(getTransacted()&&!transactionContext.isInXATransaction()){
+			transactionContext.begin();
+		}
+	}
+
+    /**
+	 * Checks whether the session has unconsumed messages.
+	 * 
+	 * @return true - if there are unconsumed messages.
+	 */
     public boolean hasUncomsumedMessages() {
         return executor.hasUncomsumedMessages();
     }
 
     /**
-     * Checks whether the session uses transactions.
-     * 
-     * @return true - if the session uses transactions.
-     */
+	 * Checks whether the session uses transactions.
+	 * 
+	 * @return true - if the session uses transactions.
+	 */
     public boolean isTransacted() {
         return this.acknowledgementMode == Session.SESSION_TRANSACTED;
     }



Mime
View raw message