Author: dejanb Date: Tue Dec 16 03:02:28 2008 New Revision: 727017 URL: http://svn.apache.org/viewvc?rev=727017&view=rev Log: fix for prefetch size issue reported in AMQ-1807 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=727017&r1=727016&r2=727017&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Dec 16 03:02:28 2008 @@ -243,11 +243,11 @@ // consumer if (getPrefetchSize() != 0) { prefetchExtension = Math.max( - prefetchExtension, index + 1); + prefetchExtension, index ); } } else { prefetchExtension = Math.max(0, - prefetchExtension - (index + 1)); + prefetchExtension - index); } destination = node.getRegionDestination(); callDispatchMatched = true; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=727017&r1=727016&r2=727017&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue Dec 16 03:02:28 2008 @@ -112,6 +112,11 @@ headers.put("passcode", password); StompFrame frame = new StompFrame("CONNECT", headers); sendFrame(frame.toString()); + + StompFrame connect = receive(); + if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { + throw new Exception ("Not connected: " + connect.getBody()); + } } public void disconnect() throws Exception { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=727017&r1=727016&r2=727017&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Tue Dec 16 03:02:28 2008 @@ -884,7 +884,7 @@ stompConnection.sendFrame(frame); // wait a bit for MBean to get refreshed try { - Thread.sleep(100); + Thread.sleep(200); } catch (InterruptedException e){} assertEquals(view.getDurableTopicSubscribers().length, 1); @@ -892,7 +892,7 @@ frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); try { - Thread.sleep(100); + Thread.sleep(200); } catch (InterruptedException e){} //reconnect @@ -920,17 +920,41 @@ stompConnection.begin("tx1"); stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null); stompConnection.commit("tx1"); - - StompFrame connect = stompConnection.receive(); - if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) { - throw new Exception ("Not connected"); - } stompConnection.subscribe("/queue/" + getQueueName()); StompFrame stompMessage = stompConnection.receive(); assertNull(stompMessage.getHeaders().get("transaction")); } + public void testPrefetchSize() throws Exception { + stompConnection.connect("system", "manager"); + + HashMap headers = new HashMap(); + headers.put("activemq.prefetchSize", "1"); + stompConnection.subscribe("/queue/" + getQueueName(), "client", headers); + + // send messages using JMS + sendMessage("message 1"); + sendMessage("message 2"); + sendMessage("message 3"); + + StompFrame frame = stompConnection.receive(); + + stompConnection.begin("tx1"); + stompConnection.ack(frame, "tx1"); + + StompFrame frame1 = stompConnection.receive(); + + try { + StompFrame frame2 = stompConnection.receive(500); + if (frame2 != null) { + fail("Should not have received the second message"); + } + } catch (SocketTimeoutException soe) {} + stompDisconnect(); + + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;