Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 24449 invoked from network); 17 Jul 2009 23:17:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Jul 2009 23:17:54 -0000 Received: (qmail 86331 invoked by uid 500); 17 Jul 2009 23:18:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 86276 invoked by uid 500); 17 Jul 2009 23:18:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 86267 invoked by uid 99); 17 Jul 2009 23:18:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2009 23:18:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2009 23:18:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6A46A2388980; Fri, 17 Jul 2009 23:18:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r795270 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/thread/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/usec... Date: Fri, 17 Jul 2009 23:18:30 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090717231830.6A46A2388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Fri Jul 17 23:18:29 2009 New Revision: 795270 URL: http://svn.apache.org/viewvc?rev=795270&view=rev Log: use non compencating schedualler and ensure DLQ copies message early - ensures accurate processing of expired messages - https://issues.apache.org/activemq/browse/AMQ-1112 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 17 23:18:29 2009 @@ -187,7 +187,7 @@ } if (getExpireMessagesPeriod() > 0) { - scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); + scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); } super.initialize(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Jul 17 23:18:29 2009 @@ -45,15 +45,18 @@ * @throws IOException */ protected void acknowledge(final ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { - if (n.isExpired()) { - if (!broker.isExpired(n)) { - LOG.info("ignoring ack " + ack + ", for already expired message: " + n); - return; - } - } final Destination q = n.getRegionDestination(); final QueueMessageReference node = (QueueMessageReference)n; final Queue queue = (Queue)q; + + if (n.isExpired()) { + if (broker.isExpired(n)) { + queue.messageExpired(context, this, node); + } else { + LOG.debug("ignoring ack " + ack + ", for already expired message: " + n); + } + return; + } queue.removeMessage(context, this, node, ack); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 17 23:18:29 2009 @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; + import javax.jms.InvalidClientIDException; import javax.jms.JMSException; import org.apache.activemq.broker.Broker; @@ -682,14 +683,14 @@ private boolean stampAsExpired(Message message) throws IOException { boolean stamped=false; if (message.getProperty(ORIGINAL_EXPIRATION) == null) { - long expiration=message.getExpiration(); - message.setExpiration(0); + long expiration=message.getExpiration(); message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration)); stamped = true; } return stamped; } + public void messageExpired(ConnectionContext context, MessageReference node) { if (LOG.isDebugEnabled()) { LOG.debug("Message expired " + node); @@ -708,11 +709,10 @@ .getRegionDestination().getDeadLetterStrategy(); if(deadLetterStrategy!=null){ if(deadLetterStrategy.isSendToDeadLetterQueue(message)){ - if (node.getRegionDestination().getActiveMQDestination().isTopic()) { - // message may be inflight to other subscriptions so do not modify - message = message.copy(); - } - if(!message.isPersistent()){ + // message may be inflight to other subscriptions so do not modify + message = message.copy(); + message.setExpiration(0); + if(!message.isPersistent()){ message.setPersistent(true); message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); @@ -727,7 +727,7 @@ if (context.getBroker()==null) { context.setBroker(getRoot()); } - BrokerSupport.resend(context,message, + BrokerSupport.resendNoCopy(context,message, deadLetterDestination); } } else { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 17 23:18:29 2009 @@ -307,8 +307,8 @@ // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { - getDestinationStatistics().getExpired().increment(); broker.messageExpired(context, message); + getDestinationStatistics().getExpired().increment(); } else { doMessageSend(producerExchange, message); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jul 17 23:18:29 2009 @@ -88,7 +88,6 @@ private transient ActiveMQConnection connection; private transient org.apache.activemq.broker.region.Destination regionDestination; private transient MemoryUsage memoryUsage; - private transient boolean expired; private BrokerId[] brokerPath; private BrokerId[] cluster; @@ -339,9 +338,6 @@ public void setExpiration(long expiration) { this.expiration = expiration; - if (this.expiration > 0) { - expired = false; - } } /** @@ -439,13 +435,8 @@ } public boolean isExpired() { - if (!expired) { - long expireTime = getExpiration(); - if (expireTime > 0 && System.currentTimeMillis() > expireTime) { - expired = true; - } - } - return expired; + long expireTime = getExpiration(); + return expireTime > 0 && System.currentTimeMillis() > expireTime; } public boolean isAdvisory() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Fri Jul 17 23:18:29 2009 @@ -47,6 +47,16 @@ TIMER_TASKS.put(task, timerTask); } + /* + * execute on rough schedual based on termination of last execution. There is no + * compensation (two runs in quick succession) for delays + */ + public synchronized void schedualPeriodically(final Runnable task, long period) { + TimerTask timerTask = new SchedulerTimerTask(task); + CLOCK_DAEMON.schedule(timerTask, period, period); + TIMER_TASKS.put(task, timerTask); + } + public synchronized void cancel(Runnable task) { TimerTask ticket = TIMER_TASKS.remove(task); if (ticket != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java Fri Jul 17 23:18:29 2009 @@ -32,6 +32,10 @@ private BrokerSupport() { } + public static void resendNoCopy(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { + doResend(context, originalMessage, deadLetterDestination, false); + } + /** * @param context * @param originalMessage @@ -39,7 +43,11 @@ * @throws Exception */ public static void resend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination) throws Exception { - Message message = originalMessage.copy(); + doResend(context, originalMessage, deadLetterDestination, true); + } + + public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception { + Message message = copy ? originalMessage.copy() : originalMessage; message.setOriginalDestination(message.getDestination()); message.setOriginalTransactionId(message.getTransactionId()); message.setDestination(deadLetterDestination); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=795270&r1=795269&r2=795270&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri Jul 17 23:18:29 2009 @@ -49,12 +49,13 @@ private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class); - BrokerService broker; - Connection connection; - Session session; - MessageProducer producer; - MessageConsumer consumer; - public ActiveMQDestination destination = new ActiveMQQueue("test"); + BrokerService broker; + Connection connection; + Session session; + MessageProducer producer; + MessageConsumer consumer; + public ActiveMQDestination destination = new ActiveMQQueue("test"); + public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); public boolean useTextMessage = true; public boolean useVMCursor = true; @@ -103,12 +104,12 @@ consumerThread.start(); - + final int numMessagesToSend = 10000; Thread producingThread = new Thread("Producing Thread") { public void run() { try { int i = 0; - while (i++ < 10000) { + while (i++ < numMessagesToSend) { producer.send(session.createTextMessage("test")); } producer.close(); @@ -159,10 +160,36 @@ return view.getQueueSize() == 0; } })); + + final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueueCount(); + final long totalExpiredCount = view.getExpiredCount() + expiredBeforeEnqueue; + + final DestinationViewMBean dlqView = createView(dlqDestination); + LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: " + dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount() + + ", dispatched: " + dlqView.getDispatchCount() + ", inflight: " + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount()); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return totalExpiredCount == dlqView.getQueueSize(); + } + }); + assertEquals("dlq contains all expired", totalExpiredCount, dlqView.getQueueSize()); + + // verify DQL + MessageConsumer dlqConsumer = createDlqConsumer(connection); + int count = 0; + while (dlqConsumer.receive(4000) != null) { + count++; + } + assertEquals("dlq returned all expired", count, totalExpiredCount); } - public void initCombosForTestRecoverExpiredMessages() { + private MessageConsumer createDlqConsumer(Connection connection) throws Exception { + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination); + } + + public void initCombosForTestRecoverExpiredMessages() { addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE}); } @@ -266,9 +293,9 @@ String domain = "org.apache.activemq"; ObjectName name; if (destination.isQueue()) { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test"); + name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName()); } else { - name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test"); + name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName()); } return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true); }