activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r518745 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/region/ broker/region/cursors/ command/ network/ openwire/ util/
Date Thu, 15 Mar 2007 20:15:05 GMT
Author: chirino
Date: Thu Mar 15 13:15:04 2007
New Revision: 518745

URL: http://svn.apache.org/viewvc?view=rev&rev=518745
Log:
removed the caching of the marshalled form of a message.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/CompositeDestinationBroker.java
Thu Mar 15 13:15:04 2007
@@ -91,7 +91,6 @@
                 }
                 message.setOriginalDestination(destination);
                 message.setDestination(destinations[i]);
-                message.evictMarshlledForm();
                 next.send(producerExchange, message);
             }
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Mar 15 13:15:04 2007
@@ -480,7 +480,6 @@
                     ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
                     message.setDestination(advisoryTopic);
                     message.setTransactionId(null);
-                    message.evictMarshlledForm();
 
                     // Disable flow control for this since since we don't want to block.
                     boolean originalFlowControl = context.isProducerFlowControl();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Mar 15 13:15:04 2007
@@ -88,16 +88,10 @@
     }
     
     public synchronized void addMessageLast(MessageReference node) throws Exception{
-        if(node!=null){
-            node.decrementReferenceCount();
-        }
         size++;
     }
     
     public void addMessageFirst(MessageReference node) throws Exception{
-        if(node!=null){
-            node.decrementReferenceCount();
-        }
         size++;
     }
     
@@ -124,6 +118,7 @@
 
     public synchronized MessageReference next(){
         Message result = batchList.removeFirst();
+        result.decrementReferenceCount();
         result.setRegionDestination(regionDestination);
         return result;
     }
@@ -137,10 +132,7 @@
 
     public void recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
-        // only increment if count is zero (could have been cached)
-        if(message.getReferenceCount()==0){
-            message.incrementReferenceCount();
-        }
+        message.incrementReferenceCount();
         batchList.addLast(message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MarshallAware.java
Thu Mar 15 13:15:04 2007
@@ -19,7 +19,6 @@
 
 import java.io.IOException;
 
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 
 public interface MarshallAware {
@@ -30,6 +29,4 @@
     public void beforeUnmarshall(WireFormat wireFormat) throws IOException;
     public void afterUnmarshall(WireFormat wireFormat) throws IOException;
     
-    public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data);
-    public ByteSequence getCachedMarshalledForm(WireFormat wireFormat);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu
Mar 15 13:15:04 2007
@@ -79,8 +79,6 @@
     private transient short referenceCount;
     private transient ActiveMQConnection connection;
     private transient org.apache.activemq.broker.region.Destination regionDestination;
-    private transient WireFormat cachedWireFormat;
-    private transient ByteSequence cachedWireFormatData;
 
     private BrokerId [] brokerPath;
     protected boolean droppable = false;
@@ -124,8 +122,6 @@
         copy.arrival = arrival;
         copy.connection = connection;
         copy.regionDestination = regionDestination;
-        copy.cachedWireFormat = cachedWireFormat;
-        copy.cachedWireFormatData = cachedWireFormatData;
         //copying the broker path breaks networks - if a consumer re-uses a consumed
         //message and forwards it on
         //copy.brokerPath = brokerPath;
@@ -544,36 +540,6 @@
     public boolean isMarshallAware() {
         return true;
     }
-
-    synchronized public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
-        if( cachedWireFormat == null || !cachedWireFormat.equals(wireFormat) ) {
-            return null;
-        }
-        return cachedWireFormatData;
-    }
-    
-    synchronized public void evictMarshlledForm() {
-        cachedWireFormat = null;
-        cachedWireFormatData = null;
-    }
-
-    synchronized public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence
data) {
-        cachedWireFormat = wireFormat;
-        cachedWireFormatData = data;
-
-        int sizeChange=0;
-        synchronized (this) {
-            if( referenceCount > 0 ) {
-                sizeChange = getSize();
-                this.size=0;
-                sizeChange -= getSize();
-            }
-        }
-        
-        if( sizeChange!=0 && regionDestination!=null )
-            regionDestination.getUsageManager().decreaseUsage(sizeChange);
-        
-    }
         
     public int incrementReferenceCount() {
         int rc;
@@ -586,6 +552,7 @@
         if( rc==1 && regionDestination!=null )
             regionDestination.getUsageManager().increaseUsage(size);
         
+//        System.out.println(" + "+getDestination()+" :::: "+getMessageId()+" "+rc);
         return rc;
     }
     
@@ -599,6 +566,8 @@
         
         if( rc==0 && regionDestination!=null )
             regionDestination.getUsageManager().decreaseUsage(size);
+
+//        System.out.println(" - "+getDestination()+" :::: "+getMessageId()+" "+rc);
         
         return rc;
     }
@@ -610,10 +579,6 @@
                 size += marshalledProperties.getLength();
             if( content!=null )
                 size += content.getLength();
-            if( cachedWireFormatData !=null )
-                size += cachedWireFormatData.getLength() + 12;
-            else 
-                size *= 2; // Estimate what the cached data will add.   
         }
         return size;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Mar 15 13:15:04 2007
@@ -496,7 +496,6 @@
         if(message.getOriginalTransactionId()==null)
             message.setOriginalTransactionId(message.getTransactionId());
         message.setTransactionId(null);
-        message.evictMarshlledForm();
         return message;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Thu Mar 15 13:15:04 2007
@@ -223,7 +223,6 @@
                 if( message.getOriginalTransactionId()==null )
                     message.setOriginalTransactionId(message.getTransactionId());
                 message.setTransactionId(null);
-                message.evictMarshlledForm();
 
                 
                 if( !message.isResponseRequired() ) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
Thu Mar 15 13:15:04 2007
@@ -130,9 +130,9 @@
         }
         
         ByteSequence sequence=null;
-        if( ma!=null ) {
-            sequence = ma.getCachedMarshalledForm(this);
-        }
+//        if( ma!=null ) {
+//             sequence = ma.getCachedMarshalledForm(this);
+//        }
         
         if( sequence == null ) {
             
@@ -185,9 +185,9 @@
                 sequence = bytesOut.toByteSequence();
             }
             
-            if( ma!=null ) {
-                ma.setCachedMarshalledForm(this, sequence);
-            }
+//            if( ma!=null ) {
+//                ma.setCachedMarshalledForm(this, sequence);
+//            }
         }
         return sequence;
     }
@@ -204,9 +204,9 @@
         }
         
         Object command = doUnmarshal(bytesIn);
-        if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
-            ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
-        }
+//        if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
+//            ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
+//        }
         return command;
     }
     
@@ -367,7 +367,8 @@
 
         if( o.isMarshallAware() ) {
             MarshallAware ma = (MarshallAware) o;
-            ByteSequence sequence=ma.getCachedMarshalledForm(this);
+            ByteSequence sequence=null; 
+//            sequence=ma.getCachedMarshalledForm(this);
             bs.writeBoolean(sequence!=null);
             if( sequence!=null ) {
                 return 1 + sequence.getLength();           
@@ -389,10 +390,12 @@
         ds.writeByte(type);
 
         if( o.isMarshallAware() && bs.readBoolean() ) {
-                        
-            MarshallAware ma = (MarshallAware) o;
-            ByteSequence sequence=ma.getCachedMarshalledForm(this);
-            ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+        	// We should not be doing any caching
+        	throw new IOException("Corrupted stream");
+//            MarshallAware ma = (MarshallAware) o;
+//            ByteSequence sequence=ma.getCachedMarshalledForm(this);
+//            ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
             
         } else {
             

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?view=diff&rev=518745&r1=518744&r2=518745
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Thu Mar 15 13:15:04 2007
@@ -36,7 +36,6 @@
             message.setOriginalTransactionId(message.getTransactionId());               
            
         message.setDestination(deadLetterDestination);
         message.setTransactionId(null);
-        message.evictMarshlledForm();
         boolean originalFlowControl=context.isProducerFlowControl();
         try{
             context.setProducerFlowControl(false);



Mime
View raw message