From commits-return-19152-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Mon Jul 2 19:36:48 2012 Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 50C799D45 for ; Mon, 2 Jul 2012 19:36:48 +0000 (UTC) Received: (qmail 71974 invoked by uid 500); 2 Jul 2012 19:36:48 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 71935 invoked by uid 500); 2 Jul 2012 19:36:48 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 71927 invoked by uid 99); 2 Jul 2012 19:36:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2012 19:36:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jul 2012 19:36:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 42B9323888EA for ; Mon, 2 Jul 2012 19:36:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1356431 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/transport/stomp/ Date: Mon, 02 Jul 2012 19:36:25 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120702193626.42B9323888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Mon Jul 2 19:36:24 2012 New Revision: 1356431 URL: http://svn.apache.org/viewvc?rev=1356431&view=rev Log: fix and tests for: https://issues.apache.org/jira/browse/AMQ-3909 Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jul 2 19:36:24 2012 @@ -808,4 +808,14 @@ public abstract class PrefetchSubscripti protected int getPrefetchExtension() { return this.prefetchExtension.get(); } + + @Override + public void setPrefetchSize(int prefetchSize) { + this.info.setPrefetchSize(prefetchSize); + try { + this.dispatchPending(); + } catch (Exception e) { + LOG.trace("Caught exception during dispatch after prefetch change.", e); + } + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1356431&r1=1356430&r2=1356431&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jul 2 19:36:24 2012 @@ -18,7 +18,6 @@ package org.apache.activemq.broker.regio import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -148,35 +147,35 @@ public class Topic extends BaseDestinati } else { DurableTopicSubscription dsub = (DurableTopicSubscription) sub; super.addSubscription(context, sub); - sub.add(context, this); - if(dsub.isActive()) { - synchronized (consumers) { - boolean hasSubscription = false; - - if(consumers.size()==0) { - hasSubscription = false; - } else { - for(Subscription currentSub : consumers) { - if(currentSub.getConsumerInfo().isDurable()) { - DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; - if(dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { - hasSubscription = true; - break; - } - } - } - } - - if(!hasSubscription) - consumers.add(sub); - } - } + sub.add(context, this); + if(dsub.isActive()) { + synchronized (consumers) { + boolean hasSubscription = false; + + if (consumers.size() == 0) { + hasSubscription = false; + } else { + for (Subscription currentSub : consumers) { + if (currentSub.getConsumerInfo().isDurable()) { + DurableTopicSubscription dcurrentSub = (DurableTopicSubscription) currentSub; + if (dcurrentSub.getSubscriptionKey().equals(dsub.getSubscriptionKey())) { + hasSubscription = true; + break; + } + } + } + } + + if (!hasSubscription) { + consumers.add(sub); + } + } + } durableSubcribers.put(dsub.getSubscriptionKey(), dsub); } } - public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) - throws Exception { + public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { if (!sub.getConsumerInfo().isDurable()) { super.removeSubscription(context, sub, lastDeliveredSequenceId); synchronized (consumers) { @@ -332,9 +331,7 @@ public class Topic extends BaseDestinati + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - // We can avoid blocking due to low usage if the producer is - // sending - // a sync message or + // We can avoid blocking due to low usage if the producer is sending a sync message or // if it is using a producer window if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) { synchronized (messagesWaitingForSpace) { @@ -378,10 +375,8 @@ public class Topic extends BaseDestinati } } else { - // Producer flow control cannot be used, so we have do the - // flow - // control at the broker - // by blocking this thread until there is space available. + // Producer flow control cannot be used, so we have do the flow control + // at the broker by blocking this thread until there is space available. if (memoryUsage.isFull()) { if (context.isInTransaction()) { @@ -763,17 +758,6 @@ public class Topic extends BaseDestinati } } - - private void clearPendingMessages(SubscriptionKey subscriptionKey) { - dispatchLock.readLock().lock(); - try { - DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey); - clearPendingAndDispatch(durableTopicSubscription); - } finally { - dispatchLock.readLock().unlock(); - } - } - private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) { synchronized (durableTopicSubscription.pendingLock) { durableTopicSubscription.pending.clear(); @@ -790,5 +774,4 @@ public class Topic extends BaseDestinati public Map getDurableTopicSubs() { return durableSubcribers; } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1356431&r1=1356430&r2=1356431&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Jul 2 19:36:24 2012 @@ -19,7 +19,9 @@ package org.apache.activemq.broker.regio import java.io.IOException; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicLong; + import javax.jms.JMSException; + import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -45,11 +47,11 @@ public class TopicSubscription extends A private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0); - + protected PendingMessageCursor matched; protected final SystemUsage usageManager; protected AtomicLong dispatchedCounter = new AtomicLong(); - + boolean singleDestination = true; Destination destination; @@ -99,9 +101,9 @@ public class TopicSubscription extends A dispatch(node); setSlowConsumer(false); } else { - if ( info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize() ) { - //we are slow - if(!isSlowConsumer()) { + if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { + // Slow consumers should log and set their state as such. + if (!isSlowConsumer()) { LOG.warn(toString() + ": has twice its prefetch limit pending, without an ack; it appears to be slow"); setSlowConsumer(true); for (Destination dest: destinations) { @@ -131,15 +133,14 @@ public class TopicSubscription extends A } matchedListMutex.wait(20); } - //Temporary storage could be full - so just try to add the message - //see https://issues.apache.org/activemq/browse/AMQ-2475 + // Temporary storage could be full - so just try to add the message + // see https://issues.apache.org/activemq/browse/AMQ-2475 if (matched.tryAddMessageLast(node, 10)) { break; } } } synchronized (matchedListMutex) { - // NOTE - be careful about the slaveBroker! if (maximumPendingMessages > 0) { // calculate the high water mark from which point we @@ -154,28 +155,26 @@ public class TopicSubscription extends A // lets discard old messages as we are a slow consumer while (!matched.isEmpty() && matched.size() > maximumPendingMessages) { int pageInSize = matched.size() - maximumPendingMessages; - // only page in a 1000 at a time - else we could - // blow da memory + // only page in a 1000 at a time - else we could blow the memory pageInSize = Math.max(1000, pageInSize); LinkedList list = null; MessageReference[] oldMessages=null; synchronized(matched){ list = matched.pageInList(pageInSize); - oldMessages = messageEvictionStrategy.evictMessages(list); - for (MessageReference ref : list) { - ref.decrementReferenceCount(); - } + oldMessages = messageEvictionStrategy.evictMessages(list); + for (MessageReference ref : list) { + ref.decrementReferenceCount(); + } } int messagesToEvict = 0; if (oldMessages != null){ - messagesToEvict = oldMessages.length; - for (int i = 0; i < messagesToEvict; i++) { - MessageReference oldMessage = oldMessages[i]; - discard(oldMessage); - } + messagesToEvict = oldMessages.length; + for (int i = 0; i < messagesToEvict; i++) { + MessageReference oldMessage = oldMessages[i]; + discard(oldMessage); + } } - // lets avoid an infinite loop if we are given a bad - // eviction strategy + // lets avoid an infinite loop if we are given a bad eviction strategy // for a bad strategy lets just not evict if (messagesToEvict == 0) { LOG.warn("No messages to evict returned for " + destination + " from eviction strategy: " + messageEvictionStrategy + " out of " + list.size() + " candidates"); @@ -205,7 +204,7 @@ public class TopicSubscription extends A /** * Discard any expired messages from the matched list. Called from a * synchronized block. - * + * * @throws IOException */ protected void removeExpiredMessages() throws IOException { @@ -275,12 +274,11 @@ public class TopicSubscription extends A dispatchMatched(); return; } else if (ack.isDeliveredAck()) { - // Message was delivered but not acknowledged: update pre-fetch - // counters. + // Message was delivered but not acknowledged: update pre-fetch counters. // also. get these for a consumer expired message. if (destination != null && !ack.isInTransaction()) { destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); - destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); + destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); } dequeueCounter.addAndGet(ack.getMessageCount()); dispatchMatched(); @@ -375,36 +373,35 @@ public class TopicSubscription extends A public int getMaxAuditDepth() { return maxAuditDepth; } - + public synchronized void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; if (audit != null) { audit.setAuditDepth(maxAuditDepth); } } - + public boolean isEnableAudit() { return enableAudit; } public synchronized void setEnableAudit(boolean enableAudit) { this.enableAudit = enableAudit; - if (enableAudit && audit==null) { + if (enableAudit && audit == null) { audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); } } - + // Implementation methods // ------------------------------------------------------------------------- public boolean isFull() { - return getDispatchedQueueSize() >= info.getPrefetchSize(); + return getDispatchedQueueSize() >= info.getPrefetchSize(); } - + public int getInFlightSize() { return getDispatchedQueueSize(); } - - + /** * @return true when 60% or more room is left for dispatching messages */ @@ -456,7 +453,7 @@ public class TopicSubscription extends A /** * inform the MessageConsumer on the client to change it's prefetch - * + * * @param newPrefetch */ public void updateConsumerPrefetch(int newPrefetch) { @@ -468,18 +465,17 @@ public class TopicSubscription extends A } } - private void dispatchMatched() throws IOException { + private void dispatchMatched() throws IOException { synchronized (matchedListMutex) { if (!matched.isEmpty() && !isFull()) { try { matched.reset(); - + while (matched.hasNext() && !isFull()) { MessageReference message = matched.next(); message.decrementReferenceCount(); matched.remove(); - // Message may have been sitting in the matched list a - // while + // Message may have been sitting in the matched list a while // waiting for the consumer to ak the message. if (message.isExpired()) { discard(message); @@ -503,8 +499,7 @@ public class TopicSubscription extends A md.setConsumerId(info.getConsumerId()); md.setDestination(node.getRegionDestination().getActiveMQDestination()); dispatchedCounter.incrementAndGet(); - // Keep track if this subscription is receiving messages from a single - // destination. + // Keep track if this subscription is receiving messages from a single destination. if (singleDestination) { if (destination == null) { destination = node.getRegionDestination(); @@ -572,4 +567,13 @@ public class TopicSubscription extends A return info.getPrefetchSize(); } + @Override + public void setPrefetchSize(int newSize) { + info.setPrefetchSize(newSize); + try { + dispatchMatched(); + } catch(Exception e) { + LOG.trace("Caught exception on dispatch after prefetch size change."); + } + } } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java?rev=1356431&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java Mon Jul 2 19:36:24 2012 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.util.HashMap; +import java.util.UUID; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StompMissingMessageTest { + + private static final Logger LOG = LoggerFactory.getLogger(StompMissingMessageTest.class); + + protected String bindAddress = "stomp://localhost:61613"; + protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml"; + protected String jmsUri = "vm://localhost"; + + private BrokerService broker; + protected String destination; + + @Before + public void setUp() throws Exception { + broker = BrokerFactory.createBroker(new URI(confUri)); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + broker.waitUntilStarted(); + + destination = "/topic/" + getTopicName(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testProducerConsumerLoop() throws Exception { + final int ITERATIONS = 500; + int received = 0; + + for (int i = 1; i <= ITERATIONS*2; i+=2) { + if (doTestProducerConsumer(i) != null) { + received++; + } + } + + assertEquals(ITERATIONS, received); + } + + public String doTestProducerConsumer(int index) throws Exception { + String message = null; + + assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length); + + StompConnection producer = stompConnect(); + StompConnection consumer = stompConnect(); + + subscribe(consumer, Integer.toString(index)); + + sendMessage(producer, index); + + try { + StompFrame frame = consumer.receive(); + LOG.debug("Consumer got frame: " + message); + assertEquals(index, (int) Integer.valueOf(frame.getBody())); + message = frame.getBody(); + } catch(Exception e) { + fail("Consumer["+index+"] got error while consuming: " + e.getMessage()); + } + + unsubscribe(consumer, Integer.toString(index)); + + stompDisconnect(consumer); + stompDisconnect(producer); + + return message; + } + + @Test + public void testProducerDurableConsumerLoop() throws Exception { + final int ITERATIONS = 500; + int received = 0; + + for (int i = 1; i <= ITERATIONS*2; i+=2) { + if (doTestProducerDurableConsumer(i) != null) { + received++; + } + } + + assertEquals(ITERATIONS, received); + } + + public String doTestProducerDurableConsumer(int index) throws Exception { + String message = null; + + assertEquals("Should not be any consumers", 0, broker.getAdminView().getTopicSubscribers().length); + + StompConnection producer = stompConnect(); + StompConnection consumer = stompConnect("test"); + + subscribe(consumer, Integer.toString(index), true); + + sendMessage(producer, index); + + try { + StompFrame frame = consumer.receive(); + LOG.debug("Consumer got frame: " + message); + assertEquals(index, (int) Integer.valueOf(frame.getBody())); + message = frame.getBody(); + } catch(Exception e) { + fail("Consumer["+index+"] got error while consuming: " + e.getMessage()); + } + + unsubscribe(consumer, Integer.toString(index)); + + stompDisconnect(consumer); + stompDisconnect(producer); + + return message; + } + + protected void subscribe(StompConnection stompConnection, String subscriptionId) throws Exception { + subscribe(stompConnection, subscriptionId, false); + } + + protected void subscribe(StompConnection stompConnection, String subscriptionId, boolean durable) throws Exception { + HashMap headers = new HashMap(); + headers.put("id", subscriptionId); + if (durable) { + headers.put("activemq.subscriptionName", subscriptionId); + } + headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString()); + + stompConnection.subscribe(destination, "auto", headers); + + StompFrame received = stompConnection.receive(); + assertEquals("RECEIPT", received.getAction()); + String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID); + assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt); + } + + protected void unsubscribe(StompConnection stompConnection, String subscriptionId) throws Exception { + HashMap headers = new HashMap(); + headers.put("id", subscriptionId); + headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString()); + + stompConnection.unsubscribe(destination, headers); + + StompFrame received = stompConnection.receive(); + assertEquals("RECEIPT", received.getAction()); + String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID); + assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt); + } + + protected void sendMessage(StompConnection producer, int index) throws Exception { + HashMap headers = new HashMap(); + headers.put(Stomp.Headers.RECEIPT_REQUESTED, UUID.randomUUID().toString()); + + producer.send(destination, Integer.toString(index), null, headers); + + StompFrame received = producer.receive(); + assertEquals("RECEIPT", received.getAction()); + String receipt = received.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID); + assertEquals(headers.get(Stomp.Headers.RECEIPT_REQUESTED), receipt); + } + + protected StompConnection stompConnect() throws Exception { + return stompConnect(null); + } + + protected StompConnection stompConnect(String clientId) throws Exception { + StompConnection stompConnection = new StompConnection(); + URI connectUri = new URI(bindAddress); + stompConnection.open(createSocket(connectUri)); + stompConnection.connect("system", "manager", clientId); + return stompConnection; + } + + protected Socket createSocket(URI connectUri) throws IOException { + return new Socket("127.0.0.1", connectUri.getPort()); + } + + protected String getTopicName() { + return getClass().getName() + ".Messages"; + } + + protected void stompDisconnect(StompConnection connection) throws Exception { + if (connection != null) { + String receiptId = UUID.randomUUID().toString(); + connection.disconnect(receiptId); + if (!connection.receive().getAction().equals(Stomp.Responses.RECEIPT)) { + throw new Exception("Failed to receive receipt for disconnect."); + } + connection.close(); + connection = null; + } + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java ------------------------------------------------------------------------------ svn:eol-style = native