activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r956918 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
Date Tue, 22 Jun 2010 15:20:50 GMT
Author: rajdavies
Date: Tue Jun 22 15:20:49 2010
New Revision: 956918

URL: http://svn.apache.org/viewvc?rev=956918&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2790

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Jun 22 15:20:49 2010
@@ -191,6 +191,7 @@ public class ActiveMQConnection implemen
     protected volatile CountDownLatch transportInterruptionProcessingComplete;
     private long consumerFailoverRedeliveryWaitPeriod;
     private final Scheduler scheduler;
+    private boolean messagePrioritySupported=true;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -1433,6 +1434,20 @@ public class ActiveMQConnection implemen
     public void setAlwaysSyncSend(boolean alwaysSyncSend) {
         this.alwaysSyncSend = alwaysSyncSend;
     }
+    
+    /**
+     * @return the messagePrioritySupported
+     */
+    public boolean isMessagePrioritySupported() {
+        return this.messagePrioritySupported;
+    }
+
+    /**
+     * @param messagePrioritySupported the messagePrioritySupported to set
+     */
+    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+        this.messagePrioritySupported = messagePrioritySupported;
+    }
 
     /**
      * Cleans up this connection so that it's state is as if the connection was

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Tue Jun 22 15:20:49 2010
@@ -115,6 +115,7 @@ public class ActiveMQConnectionFactory e
     private long consumerFailoverRedeliveryWaitPeriod = 0;
     private boolean checkForDuplicates = true;
     private ClientInternalExceptionListener clientInternalExceptionListener;
+    private boolean messagePrioritySupported = true;
 
     // /////////////////////////////////////////////
     //
@@ -318,6 +319,7 @@ public class ActiveMQConnectionFactory e
         connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
         connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
         connection.setCheckForDuplicates(isCheckForDuplicates());
+        connection.setMessagePrioritySupported(isMessagePrioritySupported());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -583,6 +585,20 @@ public class ActiveMQConnectionFactory e
     public void setSendAcksAsync(boolean sendAcksAsync) {
         this.sendAcksAsync = sendAcksAsync;
     }
+    
+    /**
+     * @return the messagePrioritySupported
+     */
+    public boolean isMessagePrioritySupported() {
+        return this.messagePrioritySupported;
+    }
+
+    /**
+     * @param messagePrioritySupported the messagePrioritySupported to set
+     */
+    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+        this.messagePrioritySupported = messagePrioritySupported;
+    }
 
 
     /**
@@ -685,6 +701,7 @@ public class ActiveMQConnectionFactory e
         props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
         props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
         props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
+        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
     }
 
     public boolean isUseCompression() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQInputStream.java
Tue Jun 22 15:20:49 2010
@@ -20,11 +20,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
-
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -47,7 +45,7 @@ public class ActiveMQInputStream extends
     private final ActiveMQConnection connection;
     private final ConsumerInfo info;
     // These are the messages waiting to be delivered to the client
-    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+    private final MessageDispatchChannel unconsumedMessages = new FifoMessageDispatchChannel();
 
     private int deliveredCounter;
     private MessageDispatch lastDelivered;
@@ -113,6 +111,7 @@ public class ActiveMQInputStream extends
         unconsumedMessages.start();
     }
 
+    @Override
     public void close() throws IOException {
         if (!unconsumedMessages.isClosed()) {
             try {
@@ -172,6 +171,7 @@ public class ActiveMQInputStream extends
         }
     }
 
+    @Override
     public int read() throws IOException {
         fillBuffer();
         if (eosReached || buffer.length == 0) {
@@ -181,6 +181,7 @@ public class ActiveMQInputStream extends
         return buffer[pos++] & 0xff;
     }
 
+    @Override
     public int read(byte[] b, int off, int len) throws IOException {
         fillBuffer();
         if (eosReached || buffer.length == 0) {
@@ -241,6 +242,7 @@ public class ActiveMQInputStream extends
         unconsumedMessages.enqueue(md);
     }
 
+    @Override
     public String toString() {
         return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" +
producerId + " }";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Jun 22 15:20:49 2010
@@ -114,7 +114,7 @@ public class ActiveMQMessageConsumer imp
     protected final ConsumerInfo info;
 
     // These are the messages waiting to be delivered to the client
-    protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+    protected final MessageDispatchChannel unconsumedMessages;
 
     // The are the messages that were delivered to the consumer but that have
     // not been acknowledged. It's kept in reverse order since we
@@ -198,6 +198,11 @@ public class ActiveMQMessageConsumer imp
                 throw new JMSException("Cannot have a prefetch size less than zero");
             }
         }
+        if (session.connection.isMessagePrioritySupported()) {
+            this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
+        }else {
+            this.unconsumedMessages = new FifoMessageDispatchChannel();
+        }
 
         this.session = session;
         this.scheduler = session.getScheduler();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Jun 22 15:20:49 2010
@@ -18,9 +18,7 @@
 package org.apache.activemq;
 
 import java.util.List;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.thread.Task;
@@ -39,14 +37,19 @@ import org.apache.commons.logging.LogFac
 public class ActiveMQSessionExecutor implements Task {
     private static final Log LOG = LogFactory.getLog(ActiveMQSessionExecutor.class);
 
-    private ActiveMQSession session;
-    private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+    private final ActiveMQSession session;
+    private final MessageDispatchChannel messageQueue;
     private boolean dispatchedBySessionPool;
     private volatile TaskRunner taskRunner;
     private boolean startedOrWarnedThatNotStarted;
 
     ActiveMQSessionExecutor(ActiveMQSession session) {
         this.session = session;
+        if (this.session.connection.isMessagePrioritySupported()) {
+           this.messageQueue = new SimplePriorityMessageDispatchChannel();
+        }else {
+            this.messageQueue = new FifoMessageDispatchChannel();
+        }
     }
 
     void setDispatchedBySessionPool(boolean value) {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java?rev=956918&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
Tue Jun 22 15:20:49 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.activemq.command.MessageDispatch;
+
+public class FifoMessageDispatchChannel implements MessageDispatchChannel {
+
+    private final Object mutex = new Object();
+    private final LinkedList<MessageDispatch> list;
+    private boolean closed;
+    private boolean running;
+
+    public FifoMessageDispatchChannel() {
+        this.list = new LinkedList<MessageDispatch>();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
+     */
+    public void enqueue(MessageDispatch message) {
+        synchronized (mutex) {
+            list.addLast(message);
+            mutex.notify();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
+     */
+    public void enqueueFirst(MessageDispatch message) {
+        synchronized (mutex) {
+            list.addFirst(message);
+            mutex.notify();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
+     */
+    public boolean isEmpty() {
+        synchronized (mutex) {
+            return list.isEmpty();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
+     */
+    public MessageDispatch dequeue(long timeout) throws InterruptedException {
+        synchronized (mutex) {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && !closed && (list.isEmpty() || !running))
{
+                if (timeout == -1) {
+                    mutex.wait();
+                } else {
+                    mutex.wait(timeout);
+                    break;
+                }
+            }
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.removeFirst();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
+     */
+    public MessageDispatch dequeueNoWait() {
+        synchronized (mutex) {
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.removeFirst();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#peek()
+     */
+    public MessageDispatch peek() {
+        synchronized (mutex) {
+            if (closed || !running || list.isEmpty()) {
+                return null;
+            }
+            return list.getFirst();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#start()
+     */
+    public void start() {
+        synchronized (mutex) {
+            running = true;
+            mutex.notifyAll();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#stop()
+     */
+    public void stop() {
+        synchronized (mutex) {
+            running = false;
+            mutex.notifyAll();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#close()
+     */
+    public void close() {
+        synchronized (mutex) {
+            if (!closed) {
+                running = false;
+                closed = true;
+            }
+            mutex.notifyAll();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#clear()
+     */
+    public void clear() {
+        synchronized (mutex) {
+            list.clear();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#size()
+     */
+    public int size() {
+        synchronized (mutex) {
+            return list.size();
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
+     */
+    public Object getMutex() {
+        return mutex;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
+     */
+    public boolean isRunning() {
+        return running;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
+     */
+    public List<MessageDispatch> removeAll() {
+        synchronized (mutex) {
+            ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
+            list.clear();
+            return rc;
+        }
+    }
+
+    @Override
+    public String toString() {
+        synchronized (mutex) {
+            return list.toString();
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/FifoMessageDispatchChannel.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java?rev=956918&r1=956917&r2=956918&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
Tue Jun 22 15:20:49 2010
@@ -16,44 +16,17 @@
  */
 package org.apache.activemq;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.command.MessageDispatch;
 
-public class MessageDispatchChannel {
+public interface MessageDispatchChannel {
+
+    public abstract void enqueue(MessageDispatch message);
+
+    public abstract void enqueueFirst(MessageDispatch message);
 
-    private final Object mutex = new Object();
-    private final LinkedList<MessageDispatch> list;
-    private boolean closed;
-    private boolean running;
-
-    public MessageDispatchChannel() {
-        this.list = new LinkedList<MessageDispatch>();
-    }
-
-    public void enqueue(MessageDispatch message) {
-        synchronized (mutex) {
-            list.addLast(message);
-            mutex.notify();
-        }
-    }
-
-    public void enqueueFirst(MessageDispatch message) {
-        synchronized (mutex) {
-            list.addFirst(message);
-            mutex.notify();
-        }
-    }
-
-    public boolean isEmpty() {
-        synchronized (mutex) {
-            return list.isEmpty();
-        }
-    }
+    public abstract boolean isEmpty();
 
     /**
      * Used to get an enqueued message. The amount of time this method blocks is
@@ -67,101 +40,28 @@ public class MessageDispatchChannel {
      * @return null if we timeout or if the consumer is closed.
      * @throws InterruptedException
      */
-    public MessageDispatch dequeue(long timeout) throws InterruptedException {
-        synchronized (mutex) {
-            // Wait until the consumer is ready to deliver messages.
-            while (timeout != 0 && !closed && (list.isEmpty() || !running))
{
-                if (timeout == -1) {
-                    mutex.wait();
-                } else {
-                    mutex.wait(timeout);
-                    break;
-                }
-            }
-            if (closed || !running || list.isEmpty()) {
-                return null;
-            }
-            return list.removeFirst();
-        }
-    }
-
-    public MessageDispatch dequeueNoWait() {
-        synchronized (mutex) {
-            if (closed || !running || list.isEmpty()) {
-                return null;
-            }
-            return list.removeFirst();
-        }
-    }
-
-    public MessageDispatch peek() {
-        synchronized (mutex) {
-            if (closed || !running || list.isEmpty()) {
-                return null;
-            }
-            return list.getFirst();
-        }
-    }
-
-    public void start() {
-        synchronized (mutex) {
-            running = true;
-            mutex.notifyAll();
-        }
-    }
-
-    public void stop() {
-        synchronized (mutex) {
-            running = false;
-            mutex.notifyAll();
-        }
-    }
-
-    public void close() {
-        synchronized (mutex) {
-            if (!closed) {
-                running = false;
-                closed = true;
-            }
-            mutex.notifyAll();
-        }
-    }
-
-    public void clear() {
-        synchronized (mutex) {
-            list.clear();
-        }
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-
-    public int size() {
-        synchronized (mutex) {
-            return list.size();
-        }
-    }
-
-    public Object getMutex() {
-        return mutex;
-    }
-
-    public boolean isRunning() {
-        return running;
-    }
-
-    public List<MessageDispatch> removeAll() {
-        synchronized (mutex) {
-            ArrayList<MessageDispatch> rc = new ArrayList<MessageDispatch>(list);
-            list.clear();
-            return rc;
-        }
-    }
-
-    public String toString() {
-        synchronized (mutex) {
-            return list.toString();
-        }
-    }
-}
+    public abstract MessageDispatch dequeue(long timeout) throws InterruptedException;
+
+    public abstract MessageDispatch dequeueNoWait();
+
+    public abstract MessageDispatch peek();
+
+    public abstract void start();
+
+    public abstract void stop();
+
+    public abstract void close();
+
+    public abstract void clear();
+
+    public abstract boolean isClosed();
+
+    public abstract int size();
+
+    public abstract Object getMutex();
+
+    public abstract boolean isRunning();
+
+    public abstract List<MessageDispatch> removeAll();
+
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=956918&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
Tue Jun 22 15:20:49 2010
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.activemq.command.MessageDispatch;
+
+public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
+    private static Integer MAX_PRIORITY = 10;
+    private final Object mutex = new Object();
+    private final LinkedList<MessageDispatch>[] lists;
+    private boolean closed;
+    private boolean running;
+    private int size = 0;
+
+    public SimplePriorityMessageDispatchChannel() {
+        this.lists = new LinkedList[MAX_PRIORITY];
+        for (int i = 0; i < MAX_PRIORITY; i++) {
+            lists[i] = new LinkedList<MessageDispatch>();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
+     * .command.MessageDispatch)
+     */
+    public void enqueue(MessageDispatch message) {
+        synchronized (mutex) {
+            getList(message).addLast(message);
+
+            this.size++;
+            mutex.notify();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
+     * .command.MessageDispatch)
+     */
+    public void enqueueFirst(MessageDispatch message) {
+        synchronized (mutex) {
+            getList(message).addFirst(message);
+            this.size++;
+            mutex.notify();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
+     */
+    public boolean isEmpty() {
+        // synchronized (mutex) {
+        return this.size == 0;
+        // }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
+     */
+    public MessageDispatch dequeue(long timeout) throws InterruptedException {
+        synchronized (mutex) {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && !closed && (isEmpty() || !running)) {
+                if (timeout == -1) {
+                    mutex.wait();
+                } else {
+                    mutex.wait(timeout);
+                    break;
+                }
+            }
+            if (closed || !running || isEmpty()) {
+                return null;
+            }
+            return removeFirst();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
+     */
+    public MessageDispatch dequeueNoWait() {
+        synchronized (mutex) {
+            if (closed || !running || isEmpty()) {
+                return null;
+            }
+            return removeFirst();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#peek()
+     */
+    public MessageDispatch peek() {
+        synchronized (mutex) {
+            if (closed || !running || isEmpty()) {
+                return null;
+            }
+            return getFirst();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#start()
+     */
+    public void start() {
+        synchronized (mutex) {
+            running = true;
+            mutex.notifyAll();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#stop()
+     */
+    public void stop() {
+        synchronized (mutex) {
+            running = false;
+            mutex.notifyAll();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#close()
+     */
+    public void close() {
+        synchronized (mutex) {
+            if (!closed) {
+                running = false;
+                closed = true;
+            }
+            mutex.notifyAll();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#clear()
+     */
+    public void clear() {
+        synchronized (mutex) {
+            for (int i = 0; i < MAX_PRIORITY; i++) {
+                lists[i].clear();
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
+     */
+    public boolean isClosed() {
+        return closed;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#size()
+     */
+    public int size() {
+        synchronized (mutex) {
+            return this.size;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
+     */
+    public Object getMutex() {
+        return mutex;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
+     */
+    public boolean isRunning() {
+        return running;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
+     */
+    public List<MessageDispatch> removeAll() {
+
+        synchronized (mutex) {
+            ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                List<MessageDispatch> list = lists[i];
+                result.addAll(list);
+                list.clear();
+            }
+            return result;
+        }
+    }
+
+    @Override
+    public String toString() {
+
+        String result = "";
+        for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+            result += i + ":{" + lists[i].toString() + "}";
+        }
+        return result;
+
+    }
+
+    protected int getPriority(MessageDispatch message) {
+        int priority = Message.DEFAULT_PRIORITY;
+        if (message.getMessage() != null) {
+        Math.max(message.getMessage().getPriority(), 0);
+        priority = Math.min(priority, 9);
+        }
+        return priority;
+    }
+
+    protected LinkedList<MessageDispatch> getList(MessageDispatch md) {
+        return lists[getPriority(md)];
+    }
+
+    private final MessageDispatch removeFirst() {
+        if (this.size > 0) {
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                LinkedList<MessageDispatch> list = lists[i];
+                if (!list.isEmpty()) {
+                    this.size--;
+                    return list.removeFirst();
+                }
+            }
+        }
+        return null;
+    }
+
+    private final MessageDispatch getFirst() {
+        if (this.size > 0) {
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                LinkedList<MessageDispatch> list = lists[i];
+                if (!list.isEmpty()) {
+                    return list.getFirst();
+                }
+            }
+        }
+        return null;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message