activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r633800 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ broker/region/ broker/region/cursors/ broker/region/policy/ broker/region/virtual/ filter/ store/amq/ store/journal/
Date Wed, 05 Mar 2008 09:35:35 GMT
Author: rajdavies
Date: Wed Mar  5 01:35:31 2008
New Revision: 633800

URL: http://svn.apache.org/viewvc?rev=633800&view=rev
Log:
Added NonCachedMessageContext

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Wed Mar  5 01:35:31 2008
@@ -55,14 +55,20 @@
     private boolean networkConnection;
     private boolean faultTolerant;
     private final AtomicBoolean stopping = new AtomicBoolean();
-    private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
+    private final MessageEvaluationContext messageEvaluationContext;
     private boolean dontSendReponse;
     private boolean clientMaster = true;
 
     public ConnectionContext() {
+    	this.messageEvaluationContext = new MessageEvaluationContext();
     }
-
+    
+    public ConnectionContext(MessageEvaluationContext messageEvaluationContext) {
+    	this.messageEvaluationContext=messageEvaluationContext;
+    }
+    
     public ConnectionContext(ConnectionInfo info) {
+    	this();
         setClientId(info.getClientId());
         setUserName(info.getUserName());
         setConnectionId(info.getConnectionId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar  5 01:35:31 2008
@@ -57,6 +57,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -195,7 +196,7 @@
         try {
             sub.add(context, this);
             destinationStatistics.getConsumers().increment();
-            MessageEvaluationContext msgContext = new MessageEvaluationContext();
+            MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
@@ -940,7 +941,7 @@
 
     
     protected ConnectionContext createConnectionContext() {
-        ConnectionContext answer = new ConnectionContext();
+        ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
         return answer;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed Mar  5 01:35:31 2008
@@ -200,8 +200,17 @@
         synchronized (clientIdSet) {
             ConnectionContext oldContext = clientIdSet.get(clientId);
             if (oldContext != null) {
+            	if (context.isFaultTolerant() || context.isNetworkConnection()){
+            		//remove the old connection
+            		try{
+            			removeConnection(oldContext, info, new Exception("remove stale client"));
+            		}catch(Exception e){
+            			LOG.warn("Failed to remove stale connection ",e);
+            		}
+            	}else{
                 throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client:
" + clientId + " already connected from "
                                                    + oldContext.getConnection().getRemoteAddress());
+            	}
             } else {
                 clientIdSet.put(clientId, context);
             }
@@ -673,7 +682,6 @@
     public void sendToDeadLetterQueue(ConnectionContext context,
 	        MessageReference node){
 		try{
-			boolean sent=false;
 			if(node!=null){
 				Message message=node.getMessage();
 				if(message!=null&&node.getRegionDestination()!=null){
@@ -703,17 +711,9 @@
 							}
 							BrokerSupport.resend(context,message,
 							        deadLetterDestination);
-							sent=true;
 						}
-					}else {
-					  //don't want to warn about failing to send 
-					  // if there isn't a dead letter strategy 
-					   sent=true;
 					}
 				}
-			}
-			if(sent==false){
-				LOG.warn("Failed to send "+node+" to DLQ");
 			}
 		}catch(Exception e){
 			LOG.warn("Caught an exception sending to DLQ: "+node,e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Mar  5 01:35:31 2008
@@ -47,6 +47,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
@@ -213,7 +214,7 @@
                 topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
             }
 
-            final MessageEvaluationContext msgContext = new MessageEvaluationContext();
+            final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
             msgContext.setDestination(destination);
             if (subscription.isRecoveryRequired()) {
                 topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener()
{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Wed Mar  5 01:35:31 2008
@@ -28,6 +28,7 @@
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
@@ -393,6 +394,6 @@
         if (LOG.isDebugEnabled()) {
             LOG.debug("Discarding message " + message);
         }
-        broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message);
+        broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()),
message);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Wed Mar  5 01:35:31 2008
@@ -25,6 +25,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -78,7 +79,7 @@
     }
     
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
-        MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
+        MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
         if (this.subscription.matches(message, messageEvaluationContext)) {
             return super.recoverMessage(message, cached);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SimpleDispatchSelector.java
Wed Mar  5 01:35:31 2008
@@ -7,6 +7,7 @@
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 
 /**
  * Simple dispatch policy that determines if a message can be sent to a subscription
@@ -26,7 +27,7 @@
     }
 
     public boolean canDispatch(Subscription subscription, MessageReference node) throws Exception
{
-        MessageEvaluationContext msgContext = new MessageEvaluationContext();
+        MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
         msgContext.setDestination(this.destination);
         msgContext.setMessageReference(node);
         return subscription.matches(node, msgContext);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
Wed Mar  5 01:35:31 2008
@@ -25,6 +25,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 
 /**
  * Represents a composite {@link Destination} where send()s are replicated to
@@ -55,7 +56,7 @@
             if (value instanceof FilteredDestination) {
                 FilteredDestination filteredDestination = (FilteredDestination)value;
                 if (messageContext == null) {
-                    messageContext = new MessageEvaluationContext();
+                    messageContext = new NonCachedMessageEvaluationContext();
                     messageContext.setMessageReference(message);
                 }
                 messageContext.setDestination(filteredDestination.getDestination());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/MessageEvaluationContext.java
Wed Mar  5 01:35:31 2008
@@ -33,11 +33,11 @@
  */
 public class MessageEvaluationContext {
 
-    private MessageReference messageReference;
-    private boolean loaded;
-    private boolean dropped;
-    private Message message;
-    private ActiveMQDestination destination;
+	protected MessageReference messageReference;
+    protected boolean loaded;
+    protected boolean dropped;
+    protected Message message;
+    protected ActiveMQDestination destination;
 
     public MessageEvaluationContext() {
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java?rev=633800&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/NonCachedMessageEvaluationContext.java
Wed Mar  5 01:35:31 2008
@@ -0,0 +1,42 @@
+package org.apache.activemq.filter;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.Message;
+
+/**
+ * NonCached version of the MessageEvaluationContext
+ * 
+ * @version $Revision: 1.4 $
+ */
+public class NonCachedMessageEvaluationContext extends MessageEvaluationContext {
+
+	
+	public Message getMessage() throws IOException {
+        return messageReference != null ? messageReference.getMessage():null;
+    }
+
+    public void setMessageReference(MessageReference messageReference) {
+        this.messageReference = messageReference;
+    }
+
+    
+    protected void clearMessageCache() {
+    }
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Wed Mar  5 01:35:31 2008
@@ -38,6 +38,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -87,7 +88,7 @@
         this.transactionStore = adapter.getTransactionStore();
         this.referenceStore = referenceStore;
         this.destination = destination;
-        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new
NonCachedMessageEvaluationContext()));
         asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
 
             public boolean iterate() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed Mar  5 01:35:31 2008
@@ -45,6 +45,7 @@
 import org.apache.activemq.command.JournalTrace;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
@@ -502,7 +503,7 @@
         int redoCounter = 0;
         LOG.info("Journal Recovery Started from: " + asyncDataManager);
         long start = System.currentTimeMillis();
-        ConnectionContext context = new ConnectionContext();
+        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
         // While we have records in the journal.
         while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
             ByteSequence data = asyncDataManager.read(pos);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
Wed Mar  5 01:35:31 2008
@@ -33,6 +33,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -75,7 +76,7 @@
         this.transactionStore = adapter.getTransactionStore();
         this.longTermStore = checkpointStore;
         this.destination = destination;
-        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+        this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new
NonCachedMessageEvaluationContext()));
     }
 
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=633800&r1=633799&r2=633800&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
Wed Mar  5 01:35:31 2008
@@ -49,6 +49,7 @@
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -469,7 +470,7 @@
         int transactionCounter = 0;
 
         LOG.info("Journal Recovery Started from: " + journal);
-        ConnectionContext context = new ConnectionContext();
+        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
 
         // While we have records in the journal.
         while ((pos = journal.getNextRecordLocation(pos)) != null) {



Mime
View raw message