activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r383957 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/LargeStreamletTest.java
Date Tue, 07 Mar 2006 18:59:42 GMT
Author: chirino
Date: Tue Mar  7 10:59:41 2006
New Revision: 383957

URL: http://svn.apache.org/viewcvs?rev=383957&view=rev
Log:
Test case and fix for http://jira.activemq.org/jira/browse/AMQ-618
Robert Newson, great job finding and putting together a test case for this.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=383957&r1=383956&r2=383957&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar  7 10:59:41 2006
@@ -292,9 +292,20 @@
         }
     }
 
-    public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck
ack,
-            final MessageReference node) throws IOException {
+    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack,
MessageReference node) throws IOException {
         if (store != null && node.isPersistent()) {
+            // the original ack may be a ranged ack, but we are trying to delete a specific

+            // message store here so we need to convert to a non ranged ack.
+            if( ack.getMessageCount() > 0 ) {
+                // Dup the ack
+                MessageAck a = new MessageAck();
+                ack.copy(a);
+                ack = a;
+                // Convert to non-ranged.
+                ack.setFirstMessageId(node.getMessageId());
+                ack.setLastMessageId(node.getMessageId());
+                ack.setMessageCount(1);
+            }
             store.removeMessage(context, ack);
         }
     }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java?rev=383957&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
Tue Mar  7 10:59:41 2006
@@ -0,0 +1,135 @@
+package org.apache.activemq;
+/**
+*
+* Copyright 2005-2006 The Apache Software Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Destination;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * @author rnewson
+ */
+public final class LargeStreamletTest extends TestCase {
+
+    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
+
+    private static final int BUFFER_SIZE = 1 * 1024;
+
+    private static final int MESSAGE_COUNT = 1024*1024;
+    
+    private int totalRead;
+
+    private int totalWritten;
+
+    private AtomicBoolean stopThreads = new AtomicBoolean(false);
+
+    public void testStreamlets() throws Exception {
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                BROKER_URL);
+
+        final ActiveMQConnection connection = (ActiveMQConnection) factory
+                .createConnection();
+        connection.start();
+        try {
+            final Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            try {
+                final Destination destination = session.createQueue("wibble");
+                final Thread readerThread = new Thread(new Runnable() {
+
+                    public void run() {
+                        totalRead = 0;
+                        try {
+                            final InputStream inputStream = connection
+                                    .createInputStream(destination);
+                            try {
+                                int read;
+                                final byte[] buf = new byte[BUFFER_SIZE];
+                                while (!stopThreads.get()
+                                        && (read = inputStream.read(buf)) != -1)
{
+                                    totalRead += read;
+                                }
+                            } finally {
+                                inputStream.close();
+                            }
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        } finally {
+                            System.err
+                                    .println(totalRead + " total bytes read.");
+                        }
+                    }
+                });
+
+                final Thread writerThread = new Thread(new Runnable() {
+
+                    public void run() {
+                        totalWritten = 0;
+                        int count = MESSAGE_COUNT;
+                        try {
+                            final OutputStream outputStream = connection
+                                    .createOutputStream(destination);
+                            try {
+                                final byte[] buf = new byte[BUFFER_SIZE];
+                                new Random().nextBytes(buf);
+                                while (count > 0 && !stopThreads.get()) {
+                                    outputStream.write(buf);
+                                    totalWritten += buf.length;
+                                    count--;
+                                }
+                            } finally {
+                                outputStream.close();
+                            }
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        } finally {
+                            System.err.println(totalWritten
+                                    + " total bytes written.");
+                        }
+                    }
+                });
+
+                readerThread.start();
+                writerThread.start();
+
+                readerThread.join(30*1000);
+                writerThread.join(10);
+                
+                stopThreads.set(true);
+                                
+                Assert.assertEquals("Not all messages accounted for", 
+                        totalWritten, totalRead);
+                
+            } finally {
+                session.close();
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+}



Mime
View raw message