activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r511085 - in /activemq/branches/activemq-4.1: ./ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/jav...
Date Fri, 23 Feb 2007 20:24:31 GMT
Author: chirino
Date: Fri Feb 23 12:24:30 2007
New Revision: 511085

URL: http://svn.apache.org/viewvc?view=rev&rev=511085
Log:
 r241@34:  chirino | 2007-02-23 14:49:05 -0500
 Fixed some Stream tests that broke a little with the latest changes..
 Added better network flow control.
 

Modified:
    activemq/branches/activemq-4.1/   (props changed)
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
    activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:24:30 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:240
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:241

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Fri Feb 23 12:24:30 2007
@@ -50,8 +50,6 @@
     // These are the messages waiting to be delivered to the client
     private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
 
-    private int deliveredCounter = 0;
-    private MessageDispatch lastDelivered;
     private boolean eosReached;
     private byte buffer[];
     private int pos;
@@ -117,10 +115,6 @@
     public void close() throws IOException {
         if (!unconsumedMessages.isClosed()) {
             try {
-                if (lastDelivered != null) {
-                    MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE,
deliveredCounter);
-                    connection.asyncSendPacket(ack);
-                }
                 dispose();
                 this.connection.syncSendPacket(info.createRemoveCommand());
             } catch (JMSException e) {
@@ -150,16 +144,8 @@
         if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired())
             return null;
 
-        deliveredCounter++;
-        if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
-            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
-            connection.asyncSendPacket(ack);
-            deliveredCounter = 0;
-            lastDelivered = null;
-        } else {
-            lastDelivered = md;
-        }
-
+        MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+        connection.asyncSendPacket(ack);
         return (ActiveMQMessage) md.getMessage();
     }
 

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Feb 23 12:24:30 2007
@@ -582,6 +582,9 @@
 
     public void dispose() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
+        	
+        	//log.warn("Consumer is being disposed.", new Exception("trace exception."));
+        	
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now. (session may still
             // commit/rollback the acks).
@@ -833,31 +836,36 @@
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener;
         try {
+        	
             synchronized(unconsumedMessages.getMutex()){
-	            if (!unconsumedMessages.isClosed()) {
-	                if (listener != null && unconsumedMessages.isRunning() ) {
-	                    ActiveMQMessage message = createActiveMQMessage(md);
-	                    beforeMessageIsConsumed(md);
-	                    try {
-	                        listener.onMessage(message);
-	                        afterMessageIsConsumed(md, false);
-	                    } catch (RuntimeException e) {
-	                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
) {
-	                            // Redeliver the message
-	                        } else {
-	                            // Transacted or Client ack: Deliver the next message.
-	                            afterMessageIsConsumed(md, false);
-	                        }
-	                        log.warn("Exception while processing message: " + e, e);
-	                    }
-	                } else {
-	                    unconsumedMessages.enqueue(md);
-	                    if (availableListener != null) {
-	                        availableListener.onMessageAvailable(this);
-	                    }
-	                }
+	            if (unconsumedMessages.isClosed()) {
+	            	return;
+            	}
+                if (listener == null || !unconsumedMessages.isRunning() ) {
+                    unconsumedMessages.enqueue(md);
+                    if (availableListener != null) {
+                        availableListener.onMessageAvailable(this);
+                    }
+                    return;
+                }                	
+        	}
+        	
+	        ActiveMQMessage message = createActiveMQMessage(md);
+	        beforeMessageIsConsumed(md);
+	        try {
+	            listener.onMessage(message);
+	            afterMessageIsConsumed(md, false);
+	        } catch (RuntimeException e) {
+	            if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
+	                // Redeliver the message
+	            } else {
+	                // Transacted or Client ack: Deliver the next message.
+	                afterMessageIsConsumed(md, false);
 	            }
-            }
+	            log.warn("Exception while processing message: " + e, e);
+	        }
+        	
+            
         } catch (Exception e) {
         	session.connection.onAsyncException(e);
         }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:24:30 2007
@@ -790,13 +790,10 @@
                 MessageDispatch md=(MessageDispatch) command;
                 Runnable sub=(Runnable) md.getConsumer();
                 broker.processDispatch(md);
-                try{
-                    dispatch(command);
-                }finally{
-                    if(sub!=null){
-                        sub.run();
-                    }
+                if(sub!=null){
+                    sub.run();
                 }
+                dispatch(command);
             } else if( command.isShutdownInfo() ) {
                 dispatch(command);
                 dispatchStopped.countDown();

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionViewMBean.java
Fri Feb 23 12:24:30 2007
@@ -71,4 +71,11 @@
      */
     public String getRemoteAddress();
 
+    /**
+     * Returns the JMS connection id for this connection
+     * 
+     * @return the JMS connection id for this connection
+     */
+    public String getConnectionId();
+
 }

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 23 12:24:30 2007
@@ -280,8 +280,8 @@
 
     public void send(final ConnectionContext context, final Message message) throws Exception
{
 
-        if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
-            if( message.isResponseRequired() ) {
+        if (context.isProducerFlowControl() ) {
+            if( message.isResponseRequired() || context.isNetworkConnection() ) {
             	if( usageManager.isFull() ) {
 //            		System.out.println("Registering callback...");
 	            	Runnable callback = new Runnable() {

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Fri Feb 23 12:24:30 2007
@@ -38,7 +38,6 @@
         super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         // We should allow the following to be configurable via a Destination Policy 
         // setAutoCreateDestinations(false);
-        System.out.println("test");
     }
 
     protected Destination createDestination(ConnectionContext context, ActiveMQDestination
destination) throws Exception {

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Feb 23 12:24:30 2007
@@ -232,7 +232,7 @@
 
     public void send(final ConnectionContext context, final Message message) throws Exception
{
 
-        if (context.isProducerFlowControl()  && !context.isNetworkConnection() )
{
+        if (context.isProducerFlowControl() ) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit
reached");
             } else {

Modified: activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Feb 23 12:24:30 2007
@@ -490,8 +490,7 @@
                             log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+":
"+message);
                         
                         
-                        
-                        if( !message.isResponseRequired() ) {
+                        if( !( message.isResponseRequired() || message.getDestination().isQueue()
) ) {
                             
                             // If the message was originally sent using async send, we will
preserve that QOS
                             // by bridging it using an async send (small chance of message
loss).

Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
Fri Feb 23 12:24:30 2007
@@ -17,6 +17,7 @@
 * limitations under the License.
 */
 
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -59,99 +60,96 @@
         final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 BROKER_URL);
 
-        final ActiveMQConnection connection = (ActiveMQConnection) factory
-                .createConnection();
+        final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
         connection.start();
+        final ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection();
+        connection2.start();
+        
         try {
-            final Session session = connection.createSession(false,
-                    Session.AUTO_ACKNOWLEDGE);
-            try {
-                final Destination destination = session.createQueue("wibble");
-                final Thread readerThread = new Thread(new Runnable() {
+            final Destination destination = new ActiveMQQueue("streamtest");
+            final Thread readerThread = new Thread(new Runnable() {
 
-                    public void run() {
-                        totalRead.set(0);
+                public void run() {
+                    totalRead.set(0);
+                    try {
+                        final InputStream inputStream = connection
+                                .createInputStream(destination);
                         try {
-                            final InputStream inputStream = connection
-                                    .createInputStream(destination);
-                            try {
-                                int read;
-                                final byte[] buf = new byte[BUFFER_SIZE];
-                                while (!stopThreads.get()
-                                        && (read = inputStream.read(buf)) != -1)
{
-                                    totalRead.addAndGet(read);
-                                }
-                            } finally {
-                                inputStream.close();
+                            int read;
+                            final byte[] buf = new byte[BUFFER_SIZE];
+                            while (!stopThreads.get()
+                                    && (read = inputStream.read(buf)) != -1) {
+                                totalRead.addAndGet(read);
                             }
-                        } catch (Exception e) {
-                            readerException  = e;
-                            e.printStackTrace();
                         } finally {
-                            log.info(totalRead + " total bytes read.");
+                            inputStream.close();
                         }
+                    } catch (Exception e) {
+                        readerException  = e;
+                        e.printStackTrace();
+                    } finally {
+                        log.info(totalRead + " total bytes read.");
                     }
-                });
+                }
+            });
 
-                final Thread writerThread = new Thread(new Runnable() {
+            final Thread writerThread = new Thread(new Runnable() {
 
-                    public void run() {
-                        totalWritten.set(0);
-                        int count = MESSAGE_COUNT;
+                public void run() {
+                    totalWritten.set(0);
+                    int count = MESSAGE_COUNT;
+                    try {
+                        final OutputStream outputStream = connection2
+                                .createOutputStream(destination);
                         try {
-                            final OutputStream outputStream = connection
-                                    .createOutputStream(destination);
-                            try {
-                                final byte[] buf = new byte[BUFFER_SIZE];
-                                new Random().nextBytes(buf);
-                                while (count > 0 && !stopThreads.get()) {
-                                    outputStream.write(buf);
-                                    totalWritten.addAndGet(buf.length);
-                                    count--;
-                                }
-                            } finally {
-                                outputStream.close();
+                            final byte[] buf = new byte[BUFFER_SIZE];
+                            new Random().nextBytes(buf);
+                            while (count > 0 && !stopThreads.get()) {
+                                outputStream.write(buf);
+                                totalWritten.addAndGet(buf.length);
+                                count--;
                             }
-                        } catch (Exception e) {
-                            writerException = e;
-                            e.printStackTrace();
                         } finally {
-                            log.info(totalWritten
-                                    + " total bytes written.");
+                            outputStream.close();
                         }
+                    } catch (Exception e) {
+                        writerException = e;
+                        e.printStackTrace();
+                    } finally {
+                        log.info(totalWritten
+                                + " total bytes written.");
                     }
-                });
+                }
+            });
 
-                readerThread.start();
-                writerThread.start();
+            readerThread.start();
+            writerThread.start();
 
-                
-                // Wait till reader is has finished receiving all the messages or he has
stopped
-                // receiving messages.
-                Thread.sleep(1000);
-                int lastRead = totalRead.get();
-                while( readerThread.isAlive() ) {
-                    readerThread.join(1000);
-                    // No progress?? then stop waiting..
-                    if( lastRead == totalRead.get() ) {
-                        break;
-                    }
-                    lastRead = totalRead.get();
+            
+            // Wait till reader is has finished receiving all the messages or he has stopped
+            // receiving messages.
+            Thread.sleep(1000);
+            int lastRead = totalRead.get();
+            while( readerThread.isAlive() ) {
+                readerThread.join(1000);
+                // No progress?? then stop waiting..
+                if( lastRead == totalRead.get() ) {
+                    break;
                 }
-                
-                stopThreads.set(true);
+                lastRead = totalRead.get();
+            }
+            
+            stopThreads.set(true);
 
-                assertTrue("Should not have received a reader exception", readerException
== null);
-                assertTrue("Should not have received a writer exception", writerException
== null);
+            assertTrue("Should not have received a reader exception", readerException ==
null);
+            assertTrue("Should not have received a writer exception", writerException ==
null);
+            
+            Assert.assertEquals("Not all messages accounted for", 
+                    totalWritten.get(), totalRead.get());
                 
-                Assert.assertEquals("Not all messages accounted for", 
-                        totalWritten.get(), totalRead.get());
-                
-            } finally {
-                session.close();
-            }
         } finally {
             connection.close();
+            connection2.close();
         }
     }
 

Modified: activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=511085&r1=511084&r2=511085
==============================================================================
--- activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++ activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Fri Feb 23 12:24:30 2007
@@ -135,7 +135,7 @@
     protected BrokerService createBroker() throws Exception {
         BrokerService service = new BrokerService();
         service.setPersistent(false);
-        service.setUseJmx(false);
+        service.setUseJmx(true);
         
         // Setup a destination policy where it takes only 1 message at a time.
         PolicyMap policyMap = new PolicyMap();        



Mime
View raw message