From activemq-commits-return-4238-apmail-geronimo-activemq-commits-archive=geronimo.apache.org@geronimo.apache.org Thu Dec 14 15:19:13 2006 Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 90409 invoked from network); 14 Dec 2006 15:19:13 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Dec 2006 15:19:13 -0000 Received: (qmail 5944 invoked by uid 500); 14 Dec 2006 15:19:20 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 5927 invoked by uid 500); 14 Dec 2006 15:19:20 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 5917 invoked by uid 99); 14 Dec 2006 15:19:20 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2006 07:19:20 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Dec 2006 07:19:12 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 61A681A981A; Thu, 14 Dec 2006 07:18:26 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r487232 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Date: Thu, 14 Dec 2006 15:18:26 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061214151826.61A681A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Thu Dec 14 07:18:25 2006 New Revision: 487232 URL: http://svn.apache.org/viewvc?view=rev&rev=487232 Log: synchronize around the send on a Message Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=487232&r1=487231&r2=487232 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Dec 14 07:18:25 2006 @@ -199,6 +199,7 @@ protected boolean closed; protected boolean asyncDispatch; protected boolean sessionAsyncDispatch; + protected Object sendMutex = new Object(); /** * Construct the Session @@ -1493,92 +1494,87 @@ * message expiration. * @throws JMSException */ - protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, - int priority, long timeToLive) throws JMSException { - checkClosed(); - - if( destination.isTemporary() && connection.isDeleted(destination) ) { - throw new JMSException("Cannot publish to a deleted Destination: "+destination); - } - - // tell the Broker we are about to start a new transaction - doStartTransaction(); - TransactionId txid = transactionContext.getTransactionId(); - - message.setJMSDestination(destination); - message.setJMSDeliveryMode(deliveryMode); - long expiration = 0L; - - if (!producer.getDisableMessageTimestamp()) { - long timeStamp = System.currentTimeMillis(); - message.setJMSTimestamp(timeStamp); - if (timeToLive > 0) { - expiration = timeToLive + timeStamp; - } - } - - message.setJMSExpiration(expiration); - message.setJMSPriority(priority); - long sequenceNumber = producer.getMessageSequence(); - - message.setJMSRedelivered(false); - - // transform to our own message format here - ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); - // Set the message id. - if( msg == message ) { - msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber) ); - } else { - msg.setMessageId( new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber) ); - message.setJMSMessageID(msg.getMessageId().toString()); - } - - msg.setTransactionId(txid); - - if ( connection.isCopyMessageOnSend() ){ - msg = (ActiveMQMessage) msg.copy(); - } - msg.onSend(); - msg.setProducerId(msg.getMessageId().getProducerId()); - - if (log.isDebugEnabled()) { - log.debug("Sending message: " + msg); - } - - if(!msg.isPersistent() || connection.isUseAsyncSend() || txid!=null) { - this.connection.asyncSendPacket(msg); - } else { - this.connection.syncSendPacket(msg); - } - - } - - /** - * Send TransactionInfo to indicate transaction has started - * - * @throws JMSException - * if some internal error occurs - */ - protected void doStartTransaction() throws JMSException { - if (getTransacted() && !transactionContext.isInXATransaction()) { - transactionContext.begin(); - } - } - - /** - * Checks whether the session has unconsumed messages. - * - * @return true - if there are unconsumed messages. - */ + protected void send(ActiveMQMessageProducer producer, + ActiveMQDestination destination,Message message,int deliveryMode, + int priority,long timeToLive) throws JMSException{ + checkClosed(); + if(destination.isTemporary()&&connection.isDeleted(destination)){ + throw new JMSException("Cannot publish to a deleted Destination: " + +destination); + } + synchronized(sendMutex){ + // tell the Broker we are about to start a new transaction + doStartTransaction(); + TransactionId txid=transactionContext.getTransactionId(); + message.setJMSDestination(destination); + message.setJMSDeliveryMode(deliveryMode); + long expiration=0L; + if(!producer.getDisableMessageTimestamp()){ + long timeStamp=System.currentTimeMillis(); + message.setJMSTimestamp(timeStamp); + if(timeToLive>0){ + expiration=timeToLive+timeStamp; + } + } + message.setJMSExpiration(expiration); + message.setJMSPriority(priority); + long sequenceNumber=producer.getMessageSequence(); + message.setJMSRedelivered(false); + // transform to our own message format here + ActiveMQMessage msg=ActiveMQMessageTransformation.transformMessage( + message,connection); + // Set the message id. + if(msg==message){ + msg.setMessageId(new MessageId(producer.getProducerInfo() + .getProducerId(),sequenceNumber)); + }else{ + msg.setMessageId(new MessageId(producer.getProducerInfo() + .getProducerId(),sequenceNumber)); + message.setJMSMessageID(msg.getMessageId().toString()); + } + msg.setTransactionId(txid); + if(connection.isCopyMessageOnSend()){ + msg=(ActiveMQMessage)msg.copy(); + } + msg.onSend(); + msg.setProducerId(msg.getMessageId().getProducerId()); + if(log.isDebugEnabled()){ + log.debug("Sending message: "+msg); + } + if(!msg.isPersistent()||connection.isUseAsyncSend()||txid!=null){ + this.connection.asyncSendPacket(msg); + }else{ + this.connection.syncSendPacket(msg); + } + } + } + + /** + * Send TransactionInfo to indicate transaction has started + * + * @throws JMSException + * if some internal error occurs + */ + protected void doStartTransaction() throws JMSException{ + if(getTransacted()&&!transactionContext.isInXATransaction()){ + transactionContext.begin(); + } + } + + /** + * Checks whether the session has unconsumed messages. + * + * @return true - if there are unconsumed messages. + */ public boolean hasUncomsumedMessages() { return executor.hasUncomsumedMessages(); } /** - * Checks whether the session uses transactions. - * - * @return true - if the session uses transactions. - */ + * Checks whether the session uses transactions. + * + * @return true - if the session uses transactions. + */ public boolean isTransacted() { return this.acknowledgementMode == Session.SESSION_TRANSACTED; }