activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r633976 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: DestinationFactoryImpl.java IndirectMessageReference.java Queue.java TempQueue.java TempQueueRegion.java TempTopic.java Topic.java
Date Wed, 05 Mar 2008 19:00:01 GMT
Author: rajdavies
Date: Wed Mar  5 10:59:58 2008
New Revision: 633976

URL: http://svn.apache.org/viewvc?rev=633976&view=rev
Log:
Define specific region classes for TempQueue and TempTopic

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=633976&r1=633975&r2=633976&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
Wed Mar  5 10:59:58 2008
@@ -77,18 +77,9 @@
         if (destination.isQueue()) {
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-                return new Queue(brokerService, destination, null, destinationStatistics,
taskRunnerFactory) {
-
-                    public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
-                        // Only consumers on the same connection can consume
-                        // from
-                        // the temporary destination
-                        if (!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId()))
{
-                            throw new JMSException("Cannot subscribe to remote temporary
destination: " + tempDest);
-                        }
-                        super.addSubscription(context, sub);
-                    };
-                };
+                Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics,
taskRunnerFactory);
+                queue.initialize();
+                return queue;
             } else {
                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
                 Queue queue = new Queue(brokerService, destination, store, destinationStatistics,
taskRunnerFactory);
@@ -97,18 +88,10 @@
                 return queue;
             }
         } else if (destination.isTemporary()) {
-            final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-            return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory)
{
-
-                public void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
-                    // Only consumers on the same connection can consume from
-                    // the temporary destination
-                    if (!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId()))
{
-                        throw new JMSException("Cannot subscribe to remote temporary destination:
" + tempDest);
-                    }
-                    super.addSubscription(context, sub);
-                };
-            };
+            
+            Topic topic = new Topic(brokerService, destination, null, destinationStatistics,
taskRunnerFactory);
+            topic.initialize();
+            return topic;
         } else {
             TopicMessageStore store = null;
             if (!AdvisorySupport.isAdvisoryTopic(destination)) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=633976&r1=633975&r2=633976&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Wed Mar  5 10:59:58 2008
@@ -16,12 +16,9 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageStore;
 
 /**
  * Keeps track of a message that is flowing through the Broker. This object may
@@ -32,21 +29,6 @@
  */
 public class IndirectMessageReference implements QueueMessageReference {
 
-    /** The destination that is managing the message */
-    private final Destination regionDestination;
-
-    private final MessageStore destinationStore;
-
-    /** The id of the message is always valid */
-    private final MessageId messageId;
-    /** Is the message persistent? */
-    private final boolean persistent;
-    private final String groupID;
-    private final int groupSequence;
-    private final ConsumerId targetConsumerId;
-
-    /** The number of times the message has been delivered. */
-    private short redeliveryCounter;
     /** The subscription that has locked the message */
     private LockOwner lockOwner;
     /** Has the message been dropped? */
@@ -54,76 +36,44 @@
     /** Has the message been acked? */
     private boolean acked;
     /** Direct reference to the message */
-    private Message message;
-    /** The number of times the message has requested being hardened */
-    private int referenceCount;
-    /** the size of the message * */
-    private int cachedSize;
-    /** the expiration time of the message */
-    private long expiration;
-
-    public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message
message) {
-        this.regionDestination = destination;
-        this.destinationStore = destinationStore;
+    private final Message message;
+    
+    /**
+     * @param message
+     */
+    public IndirectMessageReference(final Message message) {
         this.message = message;
-        this.messageId = message.getMessageId();
-        this.persistent = message.isPersistent() && destination.getMessageStore()
!= null;
-        this.groupID = message.getGroupID();
-        this.groupSequence = message.getGroupSequence();
-        this.targetConsumerId = message.getTargetConsumerId();
-        this.expiration = message.getExpiration();
-
-        this.referenceCount = 1;
-        message.incrementReferenceCount();
-        this.cachedSize = message.getSize();
+        message.getMessageId();
+        message.getGroupID();
+        message.getGroupSequence();
     }
 
-    public synchronized Message getMessageHardRef() {
+    public Message getMessageHardRef() {
         return message;
     }
 
-    public synchronized int getReferenceCount() {
-        return referenceCount;
+    public int getReferenceCount() {
+        return message.getReferenceCount();
     }
 
-    public synchronized int incrementReferenceCount() {
-        int rc = ++referenceCount;
-        if (persistent && rc == 1 && message == null) {
-
-            try {
-                message = destinationStore.getMessage(messageId);
-                if (message == null) {
-                    dropped = true;
-                } else {
-                    message.setRegionDestination(regionDestination);
-                    message.incrementReferenceCount();
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return rc;
+    public int incrementReferenceCount() {
+        return message.incrementReferenceCount();
     }
 
-    public synchronized int decrementReferenceCount() {
-        int rc = --referenceCount;
-        if (persistent && rc == 0 && message != null) {
-            message.decrementReferenceCount();
-            // message=null;
-        }
-        return rc;
+    public int decrementReferenceCount() {
+        return message.decrementReferenceCount();
     }
 
-    public synchronized Message getMessage() {
+    public Message getMessage() {
         return message;
     }
 
     public String toString() {
-        return "Message " + messageId + " dropped=" + dropped + " locked=" + (lockOwner !=
null);
+        return "Message " + message.getMessageId() + " dropped=" + dropped + " locked=" +
(lockOwner != null);
     }
 
-    public synchronized void incrementRedeliveryCounter() {
-        this.redeliveryCounter++;
+    public void incrementRedeliveryCounter() {
+        message.incrementRedeliveryCounter();
     }
 
     public synchronized boolean isDropped() {
@@ -133,10 +83,7 @@
     public synchronized void drop() {
         dropped = true;
         lockOwner = null;
-        if (!persistent && message != null) {
-            message.decrementReferenceCount();
-            message = null;
-        }
+        message.decrementReferenceCount();
     }
 
     public boolean lock(LockOwner subscription) {
@@ -159,20 +106,20 @@
         return lockOwner;
     }
 
-    public synchronized int getRedeliveryCounter() {
-        return redeliveryCounter;
+    public int getRedeliveryCounter() {
+        return message.getRedeliveryCounter();
     }
 
     public MessageId getMessageId() {
-        return messageId;
+        return message.getMessageId();
     }
 
     public Destination getRegionDestination() {
-        return regionDestination;
+        return message.getRegionDestination();
     }
 
     public boolean isPersistent() {
-        return persistent;
+        return message.isPersistent();
     }
 
     public synchronized boolean isLocked() {
@@ -188,34 +135,26 @@
     }
 
     public String getGroupID() {
-        return groupID;
+        return message.getGroupID();
     }
 
     public int getGroupSequence() {
-        return groupSequence;
+        return message.getGroupSequence();
     }
 
     public ConsumerId getTargetConsumerId() {
-        return targetConsumerId;
+        return message.getTargetConsumerId();
     }
 
     public long getExpiration() {
-        return expiration;
+        return message.getExpiration();
     }
 
     public boolean isExpired() {
-        long expireTime = getExpiration();
-        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
-            return true;
-        }
-        return false;
+        return message.isExpired();
     }
 
     public synchronized int getSize() {
-        Message msg = message;
-        if (msg != null) {
-            return msg.getSize();
-        }
-        return cachedSize;
+       return message.getSize();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=633976&r1=633975&r2=633976&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar  5 10:59:58 2008
@@ -77,18 +77,18 @@
  * @version $Revision: 1.28 $
  */
 public class Queue extends BaseDestination implements Task {
-    private final Log log;
-    private final List<Subscription> consumers = new ArrayList<Subscription>(50);
-    private PendingMessageCursor messages;
-    private final LinkedHashMap<MessageId,MessageReference> pagedInMessages = new LinkedHashMap<MessageId,MessageReference>();
+    protected final Log log;
+    protected TaskRunner taskRunner;    
+    protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
+    protected PendingMessageCursor messages;
+    private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages =
new LinkedHashMap<MessageId,QueueMessageReference>();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object sendLock = new Object();
-    private final ExecutorService executor;
-    private final TaskRunner taskRunner;    
-    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private ExecutorService executor;
+    protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
     private final ReentrantLock dispatchLock = new ReentrantLock();
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
@@ -110,28 +110,20 @@
                  TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
         
-        if (destination.isTemporary() || broker == null || store==null ) {
-            this.messages = new VMPendingMessageCursor();
-        } else {
-            this.messages = new StoreQueueCursor(broker,this);
-        }
-       
-        this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "QueueThread:"+destination);
-                thread.setDaemon(true);
-                thread.setPriority(Thread.NORM_PRIORITY);
-                return thread;
-            }
-        });
-           
-        this.taskRunner = new DeterministicTaskRunner(this.executor,this);
+        
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
         this.dispatchSelector=new QueueDispatchSelector(destination);
        
     }
         
     public void initialize() throws Exception {
+        if (this.messages == null) {
+            if (destination.isTemporary() || broker == null || store == null) {
+                this.messages = new VMPendingMessageCursor();
+            } else {
+                this.messages = new StoreQueueCursor(broker, this);
+            }
+        }
         // If a VMPendingMessageCursor don't use the default Producer System Usage
         // since it turns into a shared blocking queue which can lead to a network deadlock.
 
         // If we are ccursoring to disk..it's not and issue because it does not block due

@@ -140,6 +132,18 @@
             this.systemUsage = brokerService.getSystemUsage();
             memoryUsage.setParent(systemUsage.getMemoryUsage());
         }
+        
+       
+        this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "QueueThread:"+destination);
+                thread.setDaemon(true);
+                thread.setPriority(Thread.NORM_PRIORITY);
+                return thread;
+            }
+        });
+           
+        this.taskRunner = new DeterministicTaskRunner(this.executor,this);
         super.initialize();
         if (store != null) {
             // Restore the persistent messages.
@@ -222,10 +226,7 @@
                 // Add all the matching messages in the queue to the
                 // subscription.
                 
-                for (Iterator<MessageReference> i = pagedInMessages.values()
-                        .iterator(); i.hasNext();) {
-                    QueueMessageReference node = (QueueMessageReference) i
-                            .next();
+                for (QueueMessageReference node:pagedInMessages.values()){
                     if (!node.isDropped() && !node.isAcked() && (!node.isDropped()
||sub.getConsumerInfo().isBrowser())) {
                         msgContext.setMessageReference(node);
                         if (sub.matches(node, msgContext)) {
@@ -274,11 +275,8 @@
                 // redeliver inflight messages
                 sub.remove(context, this);
 
-                List<MessageReference> list = new ArrayList<MessageReference>();
-                for (Iterator<MessageReference> i = pagedInMessages.values()
-                        .iterator(); i.hasNext();) {
-                    QueueMessageReference node = (QueueMessageReference) i
-                            .next();
+                List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
+                for (QueueMessageReference node:pagedInMessages.values()){
                     if (!node.isDropped() && !node.isAcked()
                             && node.getLockOwner() == sub) {
                         if (node.unlock()) {
@@ -583,9 +581,8 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    private MessageReference createMessageReference(Message message) {
-        MessageReference result = new IndirectMessageReference(this, store, message);
-        result.decrementReferenceCount();
+    private QueueMessageReference createMessageReference(Message message) {
+        QueueMessageReference result = new IndirectMessageReference(message);
         return result;
     }
 
@@ -597,18 +594,17 @@
             log.error("caught an exception browsing " + this, e);
         }
         synchronized (pagedInMessages) {
-            for (Iterator<MessageReference> i = pagedInMessages.values().iterator();
i.hasNext();) {
-                MessageReference r = i.next();
-                r.incrementReferenceCount();
+            for (QueueMessageReference node:pagedInMessages.values()){
+                node.incrementReferenceCount();
                 try {
-                    Message m = r.getMessage();
+                    Message m = node.getMessage();
                     if (m != null) {
                         l.add(m);
                     }
                 } catch (IOException e) {
                     log.error("caught an exception browsing " + this, e);
                 } finally {
-                    r.decrementReferenceCount();
+                    node.decrementReferenceCount();
                 }
             }
         }
@@ -886,7 +882,6 @@
                 log.error("Failed to page in more queue messages ", e);
             }
         }
-        
         synchronized(messagesWaitingForSpace) {
                while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull())
{
                    Runnable op = messagesWaitingForSpace.removeFirst();
@@ -921,7 +916,7 @@
         };
     }
 
-    protected void removeMessage(ConnectionContext c, IndirectMessageReference r) throws
IOException {
+    protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException
{
         MessageAck ack = new MessageAck();
         ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
         ack.setDestination(destination);
@@ -955,7 +950,7 @@
         wakeup();
     }
     
-    final void wakeup() {
+    protected void wakeup() {
         try {
             taskRunner.wakeup();
         } catch (InterruptedException e) {
@@ -963,8 +958,8 @@
         }
     }
 
-    private List<MessageReference> doPageIn(boolean force) throws Exception {
-        List<MessageReference> result = null;
+    private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
+        List<QueueMessageReference> result = null;
         dispatchLock.lock();
         try{
         
@@ -972,7 +967,7 @@
             if ((force || !consumers.isEmpty()) && toPageIn > 0) {
                 messages.setMaxBatchSize(toPageIn);
                 int count = 0;
-                result = new ArrayList<MessageReference>(toPageIn);
+                result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
                     try {
                         messages.reset();
@@ -980,8 +975,8 @@
                             MessageReference node = messages.next();
                             messages.remove();
                             if (!broker.isExpired(node)) {
-                                node = createMessageReference(node.getMessage());
-                                result.add(node);
+                                QueueMessageReference ref = createMessageReference(node.getMessage());
+                                result.add(ref);
                                 count++;
                             } else {
                                 broker.messageExpired(createConnectionContext(),
@@ -994,7 +989,7 @@
                     }
                 }
                 synchronized (pagedInMessages) {
-                    for(MessageReference ref:result) {
+                    for(QueueMessageReference ref:result) {
                         pagedInMessages.put(ref.getMessageId(), ref);
                     }
                 }
@@ -1005,7 +1000,7 @@
         return result;
     }
     
-    private void doDispatch(List<MessageReference> list) throws Exception {
+    private void doDispatch(List<QueueMessageReference> list) throws Exception {
         if (list != null) {
             synchronized (consumers) {
                 for (MessageReference node : list) {
@@ -1053,7 +1048,7 @@
         pageInMessages(true);
     }
 
-    private void pageInMessages(boolean force) throws Exception {
+    protected void pageInMessages(boolean force) throws Exception {
             doDispatch(doPageIn(force));
     }
     

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=633976&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Wed Mar  5 10:59:58 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.TaskRunnerFactory;
+
+/**
+ * The Queue is a List of MessageEntry objects that are dispatched to matching
+ * subscriptions.
+ * 
+ * @version $Revision: 1.28 $
+ */
+public class TempQueue extends Queue{
+    private final ActiveMQTempDestination tempDest;
+    private TaskRunnerFactory taskFactory;
+    
+    /**
+     * @param brokerService
+     * @param destination
+     * @param store
+     * @param parentStats
+     * @param taskFactory
+     * @throws Exception
+     */
+    public TempQueue(BrokerService brokerService,
+            ActiveMQDestination destination, MessageStore store,
+            DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
+            throws Exception {
+        super(brokerService, destination, store, parentStats, taskFactory);
+        this.tempDest = (ActiveMQTempDestination) destination;
+        this.taskFactory=taskFactory;
+    }
+    
+    public void initialize() throws Exception {
+        this.messages=new VMPendingMessageCursor();
+        this.systemUsage = brokerService.getSystemUsage();
+        memoryUsage.setParent(systemUsage.getMemoryUsage());           
+        this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
+    }
+    
+    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
+        // Only consumers on the same connection can consume from
+        // the temporary destination
+        // However, we could have failed over - and we do this
+        // check client side anyways ....
+        if (!context.isFaultTolerant()
+                && (!context.isNetworkConnection() && !tempDest
+                        .getConnectionId().equals(
+                                sub.getConsumerInfo().getConsumerId()
+                                        .getConnectionId()))) {
+
+            tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
+            log.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
+        }
+        super.addSubscription(context, sub);
+    } 
+    
+    protected void wakeup() {
+        boolean result = false;
+        synchronized (messages) {
+            result = !messages.isEmpty();
+        }
+        if (result) {
+            try {
+               pageInMessages(false);
+               
+            } catch (Throwable e) {
+                log.error("Failed to page in more queue messages ", e);
+            }
+        }
+        if (!messagesWaitingForSpace.isEmpty()) {
+            try {
+                taskRunner.wakeup();
+            } catch (InterruptedException e) {
+                log.warn("Task Runner failed to wakeup ", e);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=633976&r1=633975&r2=633976&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
Wed Mar  5 10:59:58 2008
@@ -44,27 +44,10 @@
         this.brokerService = brokerService;
     }
 
-    protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination
destination) throws Exception {
-        final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
-        return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory)
{
-
-            public void addSubscription(ConnectionContext context, Subscription sub) throws
Exception {
-                // Only consumers on the same connection can consume from
-                // the temporary destination
-                // However, we could have failed over - and we do this
-                // check client side anyways ....
-                if (!context.isFaultTolerant()
-                        && (!context.isNetworkConnection() && !tempDest
-                                .getConnectionId().equals(
-                                        sub.getConsumerInfo().getConsumerId()
-                                                .getConnectionId()))) {
-
-                    tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
-                    LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
-                }
-                super.addSubscription(context, sub);
-            };
-        };
+    protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination
destination) throws Exception {  
+        TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics,
taskRunnerFactory);
+        result.initialize();
+        return result;
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java?rev=633976&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
Wed Mar  5 10:59:58 2008
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunnerFactory;
+
+/**
+ * The Topic is a destination that sends a copy of a message to every active
+ * Subscription registered.
+ * 
+ * @version $Revision: 1.21 $
+ */
+public class TempTopic  extends Topic  implements Task{
+    private final ActiveMQTempDestination tempDest;
+    /**
+     * @param brokerService
+     * @param destination
+     * @param store
+     * @param parentStats
+     * @param taskFactory
+     * @throws Exception
+     */
+    public TempTopic(BrokerService brokerService,
+            ActiveMQDestination destination, TopicMessageStore store,
+            DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
+            throws Exception {
+        super(brokerService, destination, store, parentStats, taskFactory);
+        this.tempDest = (ActiveMQTempDestination) destination;
+
+    }
+    
+    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
+        // Only consumers on the same connection can consume from
+        // the temporary destination
+        // However, we could have failed over - and we do this
+        // check client side anyways ....
+        if (!context.isFaultTolerant()
+                && (!context.isNetworkConnection() && !tempDest
+                        .getConnectionId().equals(
+                                sub.getConsumerInfo().getConsumerId()
+                                        .getConnectionId()))) {
+
+            tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
+            log.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
+        }
+        super.addSubscription(context, sub);
+    } 
+    
+    public void initialize() {
+    }
+   
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=633976&r1=633975&r2=633976&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Mar  5 10:59:58 2008
@@ -65,7 +65,7 @@
  * @version $Revision: 1.21 $
  */
 public class Topic  extends BaseDestination  implements Task{
-    private static final Log LOG = LogFactory.getLog(Topic.class);
+    protected final Log log;
     private final TopicMessageStore topicStore;
     protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
     protected final Valve dispatchValve = new Valve(true);   
@@ -90,6 +90,7 @@
                  TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
         this.topicStore=store;
+        this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
         //set default subscription recovery policy
         if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){
         	 subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
@@ -345,15 +346,15 @@
                     if (count > 2 && context.isInTransaction()) {
                         count =0;
                         int size = context.getTransaction().size();
-                        LOG.warn("Waiting for space to send  transacted message - transaction
elements = " + size + " need more space to commit. Message = " + message);
+                        log.warn("Waiting for space to send  transacted message - transaction
elements = " + size + " need more space to commit. Message = " + message);
                     }
                 }
 
                 // The usage manager could have delayed us by the time
                 // we unblock the message could have expired..
                 if (message.isExpired()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Expired message: " + message);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expired message: " + message);
                     }
                     return;
                 }
@@ -499,7 +500,7 @@
                 }
             }
         } catch (Throwable e) {
-            LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(),
e);
+            log.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(),
e);
         }
         return result.toArray(new Message[result.size()]);
     }



Mime
View raw message