qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r795958 [2/3] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apache/q...
Date Mon, 20 Jul 2009 19:05:08 GMT
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Mon Jul 20 19:05:05 2009
@@ -36,7 +36,7 @@
         this.xpath = xpath;
     }
 
-    public Object evaluate(Filterable message) throws AMQException {
+    public Object evaluate(Filterable message) {
         return Boolean.FALSE;
     }
 
@@ -49,7 +49,7 @@
      * @return true if the expression evaluates to Boolean.TRUE.
      * @throws AMQException
      */
-    public boolean matches(Filterable message) throws AMQException
+    public boolean matches(Filterable message) 
     {
         Object object = evaluate(message);
         return object!=null && object==Boolean.TRUE;            

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Mon Jul 20 19:05:05 2009
@@ -43,7 +43,7 @@
         this.xpath = xpath;
     }
     
-    public boolean evaluate(Filterable m) throws AMQException
+    public boolean evaluate(Filterable m) 
     {
         // TODO - we would have to check the content type and then evaluate the content
         //        here... is this really a feature we wish to implement? - RobG

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,11 +1,8 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Set;
-import java.util.HashSet;
 
 /*
 *
@@ -52,7 +49,7 @@
         return _bytesCredit.get() > 0L;
     }
 
-    public boolean useCreditForMessage(AMQMessage msg)
+    public boolean useCreditForMessage(ServerMessage msg)
     {
         final long msgSize = msg.getSize();
         if(hasCredit())

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,7 @@
 package org.apache.qpid.server.flow;
 
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -40,5 +41,5 @@
 
     public boolean hasCredit();
 
-    public boolean useCreditForMessage(AMQMessage msg);
+    public boolean useCreditForMessage(ServerMessage msg);
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -37,7 +37,7 @@
         return true;
     }
 
-    public boolean useCreditForMessage(AMQMessage msg)
+    public boolean useCreditForMessage(ServerMessage msg)
     {
         return true;
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,8 +1,6 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
-
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -54,7 +52,7 @@
         return (_messageCredit > 0L) && ( _bytesCredit > 0L );
     }
 
-    public synchronized boolean useCreditForMessage(AMQMessage msg)
+    public synchronized boolean useCreditForMessage(ServerMessage msg)
     {
         if(_messageCredit == 0L)
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java Mon Jul 20 19:05:05 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -50,7 +50,7 @@
         return _messageCredit.get() > 0L;
     }
 
-    public boolean useCreditForMessage(AMQMessage msg)
+    public boolean useCreditForMessage(ServerMessage msg)
     {
         if(hasCredit())
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java Mon Jul 20 19:05:05 2009
@@ -20,7 +20,7 @@
 */
 package org.apache.qpid.server.flow;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
 {
@@ -123,7 +123,7 @@
                 && (_messageCreditLimit == 0L || _messageCredit > 0);
     }
 
-    public synchronized boolean useCreditForMessage(final AMQMessage msg)
+    public synchronized boolean useCreditForMessage(final ServerMessage msg)
     {
         if(_messageCreditLimit != 0L)
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Jul 20 19:05:05 2009
@@ -41,6 +41,7 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -130,8 +131,16 @@
             throws AMQException
             {
                 singleMessageCredit.useCreditForMessage(entry.getMessage());
-                session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
-                                                                        deliveryTag, queue.getMessageCount());
+                if(entry.getMessage() instanceof AMQMessage)
+                {
+                    session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+                                                                            deliveryTag, queue.getMessageCount());
+                }
+                else
+                {
+                    //TODO Convert AMQP 0-10 message
+                    throw new RuntimeException("Not implemented conversion of 0-10 message");
+                }
 
             }
         };

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Mon Jul 20 19:05:05 2009
@@ -87,7 +87,7 @@
                 return;
             }
 
-            if (!message.getMessage().isReferenced())
+            if (message.getMessage() == null)
             {
                 _logger.warn("Message as already been purged, unable to Reject.");
                 return;
@@ -96,7 +96,7 @@
 
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Mon Jul 20 19:05:05 2009
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -61,7 +62,7 @@
             {
                 throw body.getChannelNotFoundException(channelId);
             }
-
+            StoreContext.setCurrentContext(channel.getStoreContext());
             channel.commit();
 
             MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -74,5 +75,9 @@
         {
             throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
+        finally
+        {
+            StoreContext.clearCurrentContext();
+        }
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009
@@ -36,8 +36,6 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.AMQException;
 
-import org.apache.mina.common.ByteBuffer;
-
 import java.util.Iterator;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Mon Jul 20 19:05:05 2009
@@ -1,25 +1,25 @@
 package org.apache.qpid.server.output.amqp0_9;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import org.apache.mina.common.ByteBuffer;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jul 20 19:05:05 2009
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.*;
 
 
 import java.util.Iterator;
@@ -41,12 +42,12 @@
 /**
  * A deliverable message.
  */
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable, ServerMessage
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    private final AtomicInteger _referenceCount = new AtomicInteger(1);
+    private final AtomicInteger _referenceCount = new AtomicInteger(0);
 
     private final AMQMessageHandle _messageHandle;
 
@@ -72,7 +73,7 @@
 
     private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
+    private final AMQMessageHeader _messageHeader;
 
 
     /**
@@ -202,6 +203,7 @@
         _messageHandle = factory.createMessageHandle(messageId, store, true);
         _storeContext = txnConext.getStoreContext();
         _size = _messageHandle.getBodySize(txnConext.getStoreContext());
+        _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(txnConext.getStoreContext()));
     }
 
         /**
@@ -221,6 +223,7 @@
     {
         _messageHandle = messageHandle;
         _storeContext = storeConext;
+        _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(storeConext));
 
         if(info.isImmediate())
         {
@@ -234,6 +237,7 @@
     protected AMQMessage(AMQMessage msg) throws AMQException
     {
         _messageHandle = msg._messageHandle;
+        _messageHeader = msg._messageHeader;
         _storeContext = msg._storeContext;
         _flags = msg._flags;
         _size = msg._size;
@@ -315,12 +319,11 @@
      * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
      * message store.
      *
-     * @param storeContext
      *
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
-    public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+    public void decrementReference() throws MessageCleanupException
     {
 
         int count = _referenceCount.decrementAndGet();
@@ -342,13 +345,12 @@
                 // and the handle has not yet been constructed
                 if (_messageHandle != null)
                 {
-                    _messageHandle.removeMessage(storeContext);
+                    _messageHandle.removeMessage(StoreContext.getCurrentContext());
                 }
             }
             catch (AMQException e)
             {
-                // to maintain consistency, we revert the count
-                incrementReference();
+
                 throw new MessageCleanupException(getMessageId(), e);
             }
         }
@@ -373,7 +375,18 @@
         return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
-    public boolean isPersistent() throws AMQException
+    public String getRoutingKey()
+    {
+        // TODO
+        return null;
+    }
+
+    public AMQMessageHeader getMessageHeader()
+    {
+        return _messageHeader;
+    }
+
+    public boolean isPersistent()
     {
         return _messageHandle.isPersistent();
     }
@@ -455,6 +468,26 @@
 
     }
 
+    public boolean isImmediate()
+    {
+        return (_flags & IMMEDIATE) == IMMEDIATE;
+    }
+
+    public long getExpiration()
+    {
+        return _expiration;
+    }
+
+    public MessageReference newReference()
+    {
+        return new AMQMessageReference(this);
+    }
+
+    public Long getMessageNumber()
+    {
+        return getMessageId();
+    }
+
     public Object getPublisherClientInstance()
     {
         //todo store sessionIdentifier/client id with message in store

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jul 20 19:05:05 2009
@@ -20,15 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -88,7 +86,7 @@
     int delete() throws AMQException;
 
 
-    QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+    QueueEntry enqueue(ServerMessage message) throws AMQException;
 
     void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jul 20 19:05:05 2009
@@ -36,6 +36,7 @@
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -246,7 +247,7 @@
     /**
      * Checks if there is any notification to be send to the listeners
      */
-    public void checkForNotification(AMQMessage msg) throws AMQException
+    public void checkForNotification(ServerMessage msg) throws AMQException
     {
 
         final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -333,48 +334,60 @@
             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
         }
 
-        AMQMessage msg = entry.getMessage();
-        // get message content
-        Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
-        List<Byte> msgContent = new ArrayList<Byte>();
-        while (cBodies.hasNext())
+        ServerMessage serverMsg = entry.getMessage();
+
+        if(serverMsg instanceof AMQMessage)
         {
-            ContentChunk body = cBodies.next();
-            if (body.getSize() != 0)
+            AMQMessage msg = (AMQMessage) serverMsg;
+            // get message content
+            Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+            List<Byte> msgContent = new ArrayList<Byte>();
+            while (cBodies.hasNext())
             {
+                ContentChunk body = cBodies.next();
                 if (body.getSize() != 0)
                 {
-                    ByteBuffer slice = body.getData().slice();
-                    for (int j = 0; j < slice.limit(); j++)
+                    if (body.getSize() != 0)
                     {
-                        msgContent.add(slice.get());
+                        ByteBuffer slice = body.getData().slice();
+                        for (int j = 0; j < slice.limit(); j++)
+                        {
+                            msgContent.add(slice.get());
+                        }
                     }
                 }
             }
-        }
 
-        try
-        {
-            // Create header attributes list
-            CommonContentHeaderProperties headerProperties =
-                (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
-            String mimeType = null, encoding = null;
-            if (headerProperties != null)
+
+            try
             {
-                AMQShortString mimeTypeShortSting = headerProperties.getContentType();
-                mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
-                encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
-            }
+                // Create header attributes list
+                CommonContentHeaderProperties headerProperties =
+                    (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+                String mimeType = null, encoding = null;
+                if (headerProperties != null)
+                {
+                    AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+                    mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+                    encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+                }
 
-            Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+                Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+                return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+            }
+            catch (AMQException e)
+            {
+                JMException jme = new JMException("Error creating header attributes list: " + e);
+                jme.initCause(e);
+                throw jme;
+            }
 
-            return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
         }
-        catch (AMQException e)
+        else
         {
-            JMException jme = new JMException("Error creating header attributes list: " + e);
-            jme.initCause(e);
-            throw jme;
+            // TODO 0-10 Messages for MBean
+            return null;
         }
     }
 
@@ -398,13 +411,21 @@
             for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
             {
                 long position = i;
-                AMQMessage msg = list.get(i - 1).getMessage();
-                ContentHeaderBody headerBody = msg.getContentHeaderBody();
-                // Create header attributes list
-                String[] headerAttributes = getMessageHeaderProperties(headerBody);
-                Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
-                CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
-                _messageList.put(messageData);
+                ServerMessage serverMsg = list.get(i - 1).getMessage();
+                if(serverMsg instanceof AMQMessage)
+                {
+                    AMQMessage msg = (AMQMessage) serverMsg;
+                    ContentHeaderBody headerBody = msg.getContentHeaderBody();
+                    // Create header attributes list
+                    String[] headerAttributes = getMessageHeaderProperties(headerBody);
+                    Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position };
+                    CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+                    _messageList.put(messageData);
+                }
+                else
+                {
+                    // TODO 0-10 Message
+                }
             }
         }
         catch (AMQException e)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Mon Jul 20 19:05:05 2009
@@ -68,4 +68,9 @@
     {
         return _queueMap.values();
     }
+
+    public AMQQueue getQueue(String queue)
+    {
+        return getQueue(new AMQShortString(queue));
+    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Mon Jul 20 19:05:05 2009
@@ -22,12 +22,13 @@
 
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
-public interface Filterable<E extends Exception>
+public interface Filterable
 {
-    ContentHeaderBody getContentHeaderBody() throws E;
+    AMQMessageHeader getMessageHeader();
 
-    boolean isPersistent() throws E;
+    boolean isPersistent();
 
     boolean isRedelivered();
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Mon Jul 20 19:05:05 2009
@@ -28,16 +28,20 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
+import org.apache.qpid.server.message.AMQMessageReference;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
 
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements Filterable, InboundMessage
 {
 
     /** Used for debugging purposes. */
@@ -73,6 +77,7 @@
     private long _expiration;
     
     private Exchange _exchange;
+    private AMQMessageHeader _messageHeader;
 
 
     public IncomingMessage(final Long messageId,
@@ -90,6 +95,7 @@
     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
     {
         _contentHeaderBody = contentHeaderBody;
+        _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
     }
 
     public void setExpiration()
@@ -158,17 +164,19 @@
         }
 
         AMQMessage message = null;
+        AMQMessageReference ref = null;
 
         try
         {
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
             _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
-                                                          _messagePublishInfo, getContentHeaderBody());
+                                                          _messagePublishInfo, getContentHeader());
 
             
             
             message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
+            ref = (AMQMessageReference) message.newReference();
 
             message.setExpiration(_expiration);
             message.setClientIdentifier(_publisher.getSessionIdentifier());
@@ -177,8 +185,8 @@
             // now that it has all been received, before we attempt delivery
             _txnContext.messageFullyReceived(isPersistent());
             
-            AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
-                     ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; 
+            AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ?
+                     ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null;
             
             if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
             {
@@ -202,7 +210,7 @@
             {
                 int offset;
                 final int queueCount = _destinationQueues.size();
-                message.incrementReference(queueCount);
+
                 if(queueCount == 1)
                 {
                     offset = 0;
@@ -233,7 +241,8 @@
         finally
         {
             // Remove refence for routing process . Reference count should now == delivered queue count
-            if(message != null) message.decrementReference(_txnContext.getStoreContext());
+
+            if(ref != null) ref.release();
         }
 
     }
@@ -250,40 +259,51 @@
 
     public boolean allContentReceived()
     {
-        return (_bodyLengthReceived == getContentHeaderBody().bodySize);
+        return (_bodyLengthReceived == getContentHeader().bodySize);
     }
 
-    public AMQShortString getExchange() throws AMQException
+    public AMQShortString getExchange()
     {
         return _messagePublishInfo.getExchange();
     }
 
-    public AMQShortString getRoutingKey() throws AMQException
+    public String getRoutingKey()
     {
-        return _messagePublishInfo.getRoutingKey();
+        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
     }
 
-    public boolean isMandatory() throws AMQException
+    public String getBinding()
+    {
+        return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
+    }
+
+
+    public boolean isMandatory()
     {
         return _messagePublishInfo.isMandatory();
     }
 
 
-    public boolean isImmediate() throws AMQException
+    public boolean isImmediate()
     {
         return _messagePublishInfo.isImmediate();
     }
 
-    public ContentHeaderBody getContentHeaderBody()
+    public ContentHeaderBody getContentHeader()
     {
         return _contentHeaderBody;
     }
 
 
+    public AMQMessageHeader getMessageHeader()
+    {
+        return _messageHeader;
+    }
+
     public boolean isPersistent()
     {
-        return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
-             ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 
+        return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
                                                              BasicContentHeaderProperties.PERSISTENT;
     }
     
@@ -292,6 +312,11 @@
         return false;
     }
 
+    public long getSize()
+    {
+        return getContentHeader().bodySize;
+    }
+
     public void setMessageStore(final MessageStore messageStore)
     {
         _messageStore = messageStore;
@@ -309,7 +334,8 @@
 
     public void route() throws AMQException
     {
-        _exchange.route(this);
+        enqueue(_exchange.route(this));
+
     }
 
     public void enqueue(final ArrayList<AMQQueue> queues)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jul 20 19:05:05 2009
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.ServerMessage;
 
 /**
  * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate
@@ -35,7 +36,7 @@
  */
 public class NoConsumersException extends RequiredDeliveryException
 {
-    public NoConsumersException(AMQMessage message)
+    public NoConsumersException(ServerMessage message)
     {
         super("Immediate delivery is not possible.", message);
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Mon Jul 20 19:05:05 2009
@@ -21,13 +21,14 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
 
 public enum NotificationCheck
 {
 
     MESSAGE_COUNT_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             int msgCount;
             final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@
     },
     MESSAGE_SIZE_ALERT(true)
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             final long maximumMessageSize = queue.getMaximumMessageSize();
             if(maximumMessageSize != 0)
             {
                 // Check for threshold message size
                 long messageSize;
-                try
-                {
-                    messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
-                }
-                catch (AMQException e)
-                {
-                    messageSize = 0;
-                }
+                messageSize = (msg == null) ? 0 : msg.getSize();
 
 
                 if (messageSize >= maximumMessageSize)
                 {
-                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+                    listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
                     return true;
                 }
             }
@@ -70,7 +64,7 @@
     },
     QUEUE_DEPTH_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
             // Check for threshold queue depth in bytes
             final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@
     },
     MESSAGE_AGE_ALERT
     {
-        boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+        boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
         {
 
             final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@
         return _messageSpecific;
     }
 
-    abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+    abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
 
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Mon Jul 20 19:05:05 2009
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
 
 public class PriorityQueueList implements QueueEntryList
 {
@@ -52,26 +53,18 @@
         return _queue;
     }
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(ServerMessage message)
     {
-        try
+        int index = message.getMessageHeader().getPriority() - _priorityOffset;
+        if(index >= _priorities)
         {
-            int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
-            if(index >= _priorities)
-            {
-                index = _priorities-1;
-            }
-            else if(index < 0)
-            {
-                index = 0;
-            }
-            return _priorityLists[index].add(message);
+            index = _priorities-1;
         }
-        catch (AMQException e)
+        else if(index < 0)
         {
-            // TODO - fix AMQ Exception
-            throw new RuntimeException(e);
+            index = 0;
         }
+        return _priorityLists[index].add(message);
 
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Jul 20 19:05:05 2009
@@ -3,6 +3,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -133,7 +134,7 @@
 
     AMQQueue getQueue();
 
-    AMQMessage getMessage();
+    ServerMessage getMessage();
 
     long getSize();
 
@@ -155,8 +156,6 @@
 
     void release();
 
-    String debugIdentity();
-
     boolean immediateAndNotDelivered();
 
     void setRedelivered(boolean b);

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 20 19:05:05 2009
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.log4j.Logger;
 
 import java.util.Set;
@@ -42,7 +44,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private AMQMessage _message;
+    private MessageReference _message;
 
 
     private Set<Subscription> _rejectedBy = null;
@@ -75,6 +77,8 @@
     private volatile long _entryId;
 
     volatile QueueEntryImpl _next;
+    private boolean _deliveredToConsumer;
+    private boolean _redelivered;
 
 
     QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -84,18 +88,18 @@
     }
 
 
-    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
-        _message = message;
+        _message = message == null ? null : message.newReference();
 
         _entryIdUpdater.set(this, entryId);
     }
 
-    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+    public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
-        _message = message;
+        _message = message == null ? null :  message.newReference();
     }
 
     protected void setEntryId(long entryId)
@@ -113,9 +117,9 @@
         return _queueEntryList.getQueue();
     }
 
-    public AMQMessage getMessage()
+    public ServerMessage getMessage()
     {
-        return _message;
+        return  _message == null ? null : _message.getMessage();
     }
 
     public long getSize()
@@ -125,12 +129,21 @@
 
     public boolean getDeliveredToConsumer()
     {
-        return getMessage().getDeliveredToConsumer();
+        return _deliveredToConsumer;
     }
 
     public boolean expired() throws AMQException
     {
-        return getMessage().expired(getQueue());
+        long expiration = getMessage().getExpiration();
+        if (expiration != 0L)
+        {
+            long now = System.currentTimeMillis();
+
+            return (now > expiration);
+        }
+
+        return false;
+
     }
 
     public boolean isAcquired()
@@ -167,7 +180,7 @@
 
     public void setDeliveredToSubscription()
     {
-        getMessage().setDeliveredToConsumer();
+        _deliveredToConsumer = true;
     }
 
     public void release()
@@ -175,20 +188,15 @@
         _stateUpdater.set(this,AVAILABLE_STATE);
     }
 
-    public String debugIdentity()
-    {
-        return getMessage().debugIdentity();
-    }
-
 
     public boolean immediateAndNotDelivered() 
     {
-        return _message.immediateAndNotDelivered();
+        return getMessage().isImmediate() && !_deliveredToConsumer;
     }
 
     public void setRedelivered(boolean b)
     {
-        getMessage().setRedelivered(b);
+        _redelivered = b;
     }
 
     public Subscription getDeliveredSubscription()
@@ -223,7 +231,7 @@
         }
         else
         {
-            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+            _log.warn("Requesting rejection by null subscriber:" + this);
         }
     }
 
@@ -284,7 +292,9 @@
     {
         if(delete())
         {
-            getMessage().decrementReference(storeContext);
+            StoreContext sc = StoreContext.setCurrentContext(storeContext);
+            _message.release();
+            StoreContext.setCurrentContext(sc);
         }
     }
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Mon Jul 20 19:05:05 2009
@@ -20,11 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.ServerMessage;
+
 public interface QueueEntryList
 {
     AMQQueue getQueue();
 
-    QueueEntry add(AMQMessage message);
+    QueueEntry add(ServerMessage message);
 
     QueueEntry next(QueueEntry node);
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Mon Jul 20 19:05:05 2009
@@ -40,4 +40,5 @@
 
     Collection<AMQQueue> getQueues();
 
+    AMQQueue getQueue(String queue);
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Jul 20 19:05:05 2009
@@ -29,6 +29,8 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
 
 /*
 *
@@ -319,7 +321,7 @@
 
     // ------ Enqueue / Dequeue
 
-    public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+    public QueueEntry enqueue(ServerMessage message) throws AMQException
     {
 
         incrementQueueCount();
@@ -406,8 +408,10 @@
             }
         }
 
+
         if (entry.immediateAndNotDelivered())
         {
+            StoreContext storeContext = StoreContext.getCurrentContext();
             dequeue(storeContext, entry);
             entry.dispose(storeContext);
         }
@@ -462,7 +466,7 @@
         // Simple Queues don't :-)
     }
 
-    private void incrementQueueSize(final AMQMessage message)
+    private void incrementQueueSize(final ServerMessage message)
     {
         getAtomicQueueSize().addAndGet(message.getSize());
     }
@@ -573,10 +577,10 @@
 
         try
         {
-            AMQMessage msg = entry.getMessage();
+            ServerMessage msg = entry.getMessage();
             if (msg.isPersistent())
             {
-                _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+                _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageNumber());
             }
             //entry.dispose(storeContext);
 
@@ -767,7 +771,7 @@
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                final long messageId = entry.getMessage().getMessageNumber();
                 return messageId >= fromMessageId && messageId <= toMessageId;
             }
 
@@ -786,7 +790,7 @@
 
             public boolean accept(QueueEntry entry)
             {
-                _complete = entry.getMessage().getMessageId() == messageId;
+                _complete = entry.getMessage().getMessageNumber() == messageId;
                 return _complete;
             }
 
@@ -828,7 +832,7 @@
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                final long messageId = entry.getMessage().getMessageNumber();
                 return (messageId >= fromMessageId)
                        && (messageId <= toMessageId)
                        && entry.acquire();
@@ -847,11 +851,11 @@
             // Move the messages in on the message store.
             for (QueueEntry entry : entries)
             {
-                AMQMessage message = entry.getMessage();
+                ServerMessage message = entry.getMessage();
 
                 if (message.isPersistent() && toQueue.isDurable())
                 {
-                    store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+                    store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
                 }
                 // dequeue does not decrement the refence count
                 entry.dequeue(storeContext);
@@ -882,9 +886,11 @@
 
         try
         {
+            StoreContext.setCurrentContext(storeContext);
+
             for (QueueEntry entry : entries)
             {
-                toQueue.enqueue(storeContext, entry.getMessage());
+                toQueue.enqueue(entry.getMessage());
                 entry.delete();
             }
         }
@@ -896,6 +902,11 @@
         {
             throw new RuntimeException(e);
         }
+        finally
+        {
+            StoreContext.clearCurrentContext();
+
+        }
 
     }
 
@@ -912,17 +923,9 @@
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
-                if ((messageId >= fromMessageId)
-                    && (messageId <= toMessageId))
-                {
-                    if (!entry.isDeleted())
-                    {
-                        return entry.getMessage().incrementReference();
-                    }
-                }
-
-                return false;
+                final long messageId = entry.getMessage().getMessageNumber();
+                return ((messageId >= fromMessageId)
+                    && (messageId <= toMessageId));
             }
 
             public boolean filterComplete()
@@ -938,11 +941,15 @@
             // Move the messages in on the message store.
             for (QueueEntry entry : entries)
             {
-                AMQMessage message = entry.getMessage();
+                ServerMessage message = entry.getMessage();
 
-                if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+                if (message.isPersistent() && toQueue.isDurable())
                 {
-                    store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+
+                    StoreContext sc = StoreContext.setCurrentContext(storeContext);
+                    store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
+                    StoreContext.setCurrentContext(sc);
+
                 }
             }
 
@@ -973,9 +980,11 @@
         {
             for (QueueEntry entry : entries)
             {
-                if (entry.getMessage().isReferenced())
+
+                ServerMessage message = entry.getMessage();
+                if (message != null)
                 {
-                    toQueue.enqueue(storeContext, entry.getMessage());
+                    toQueue.enqueue(entry.getMessage());
                 }
             }
         }
@@ -1001,7 +1010,7 @@
             {
                 QueueEntry node = queueListIterator.getNode();
 
-                final long messageId = node.getMessage().getMessageId();
+                final long messageId = node.getMessage().getMessageNumber();
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
@@ -1418,7 +1427,7 @@
         }
     }
 
-    @Override
+
     public void checkMessageStatus() throws AMQException
     {
 
@@ -1581,7 +1590,7 @@
         for (int i = 0; i < num && !it.atTail(); i++)
         {
             it.advance();
-            ids.add(it.getNode().getMessage().getMessageId());
+            ids.add(it.getNode().getMessage().getMessageNumber());
         }
         return ids;
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Mon Jul 20 19:05:05 2009
@@ -1,5 +1,8 @@
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /*
@@ -74,7 +77,7 @@
     }
 
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(ServerMessage message)
     {
         QueueEntryImpl node = new QueueEntryImpl(this, message);
         for (;;)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Mon Jul 20 19:05:05 2009
@@ -1352,7 +1352,10 @@
 
         public void process() throws AMQException
         {
-            _queue.enqueue(_context, _message);
+            StoreContext.setCurrentContext(_context);
+            _queue.enqueue(_message);
+            StoreContext.clearCurrentContext();
+
 
         }
 
@@ -1414,7 +1417,7 @@
 
                 if(message != null)
                 {
-                    message.incrementReference();
+//                    message.incrementReference();
                 }
                 else
                 {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Mon Jul 20 19:05:05 2009
@@ -32,9 +32,12 @@
 {
     private static final Logger _logger = Logger.getLogger(StoreContext.class);
 
+    private static final ThreadLocal<StoreContext> _threadLocalContext = new ThreadLocal<StoreContext>();
+
     private String _name;
     private Object _payload;
 
+
     public StoreContext()
     {
         _name = "StoreContext";
@@ -68,4 +71,24 @@
     {
         return "<_name = " + _name + ", _payload = " + _payload + ">";
     }
+
+
+    public static StoreContext setCurrentContext(StoreContext context)
+    {
+        StoreContext sc = getCurrentContext();
+        _threadLocalContext.set(context);
+        return sc;
+    }
+
+    public static StoreContext getCurrentContext()
+    {
+        return _threadLocalContext.get();
+    }
+
+    public static void clearCurrentContext()
+    {
+        _threadLocalContext.set(null);
+    }
+
+
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Jul 20 19:05:05 2009
@@ -32,8 +32,10 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.filter.FilterManager;
@@ -377,16 +379,19 @@
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
+                _logger.debug("Subscription:" + this + " rejected message:" + entry);
             }
 //            return false;
         }
 
         if (_noLocal)
         {
-            //todo - client id should be recoreded so we don't have to handle
+
+            AMQMessage message = (AMQMessage) entry.getMessage();
+
+            //todo - client id should be recorded so we don't have to handle
             // the case where this is null.
-            final Object publisherId = entry.getMessage().getPublisherClientInstance();
+            final Object publisherId = message.getPublisherClientInstance();
 
             // We don't want local messages so check to see if message is one we sent
             Object localInstance;
@@ -404,8 +409,8 @@
 
                 localInstance = getProtocolSession().getClientIdentifier();
 
-                //todo - client id should be recoreded so we don't have to do the null check
-                if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
+                //todo - client id should be recorded so we don't have to do the null check
+                if (localInstance != null && localInstance.equals(message.getPublisherIdentifier()))
                 {
                     return false;
                 }
@@ -417,7 +422,7 @@
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+            _logger.debug("(" + this + ") checking filters for message (" + entry);
         }
         return checkFilters(entry);
 

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ServerConnection extends Connection
+{
+    @Override
+    protected void invoke(Method method)
+    {
+        super.invoke(method);
+    }
+
+    @Override
+    protected void setState(State state)
+    {
+        super.setState(state);    
+    }
+
+    @Override
+    public ServerConnectionDelegate getConnectionDelegate()
+    {
+        return (ServerConnectionDelegate) super.getConnectionDelegate();
+    }
+
+    public void setConnectionDelegate(ServerConnectionDelegate delegate)
+    {
+        super.setConnectionDelegate(delegate);
+    }
+
+    private VirtualHost _virtualHost;
+
+
+    public VirtualHost getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    public void setVirtualHost(VirtualHost virtualHost)
+    {
+        _virtualHost = virtualHost;
+    }
+}

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import java.util.*;
+
+
+public class ServerConnectionDelegate extends ServerDelegate
+{
+
+    private String _localFQDN;
+    private final IApplicationRegistry _appRegistry;
+
+
+    public ServerConnectionDelegate(IApplicationRegistry appRegistry,
+                                    String localFQDN)
+    {
+        this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+    }
+
+
+    public ServerConnectionDelegate(Map<String, Object> properties,
+                                    List<Object> locales,
+                                    IApplicationRegistry appRegistry,
+                                    String localFQDN)
+    {
+        super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
+        _appRegistry = appRegistry;
+        _localFQDN = localFQDN;
+    }
+
+    private static List<Object> parseToList(String mechanisms)
+    {
+        List<Object> list = new ArrayList<Object>();
+        StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+        while(tokenizer.hasMoreTokens())
+        {
+            list.add(tokenizer.nextToken());
+        }
+        return list;
+    }
+
+    @Override public ServerSession getSession(Connection conn, SessionAttach atc)
+    {
+
+        SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+
+        ServerSession ssn = new ServerSession(conn, serverSessionDelegate,  new Binary(atc.getName()), 0);
+        //ssn.setSessionListener(new Echo());
+        return ssn;
+    }
+
+
+
+
+    @Override
+    protected SaslServer createSaslServer(String mechanism) throws SaslException
+    {
+        return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+
+    }
+
+
+    @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+    {
+        ServerConnection sconn = (ServerConnection) conn;
+
+        VirtualHost vhost;
+        String vhostName;
+        if(open.hasVirtualHost())
+        {
+            vhostName = open.getVirtualHost();
+        }
+        else
+        {
+            vhostName = "";
+        }
+        vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+
+        if(vhost != null)
+        {
+            sconn.setVirtualHost(vhost);
+
+            sconn.invoke(new ConnectionOpenOk(Collections.EMPTY_LIST));
+
+            sconn.setState(Connection.State.OPEN);
+        }
+        else
+        {
+            sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown vistrulhost '"+vhostName+"'"));
+            sconn.setState(Connection.State.CLOSING);
+        }
+
+    }
+}

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSession extends Session
+{
+    ServerSession(Connection connection, Binary name, long expiry)
+    {
+        super(connection, name, expiry);
+    }
+
+    ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+    {
+        super(connection, delegate, name, expiry);
+    }
+
+    public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues)
+    {
+        // TODO Txn
+
+        try
+        {
+            for(AMQQueue q : queues)
+            {
+                q.enqueue(message);
+            }
+        }
+        catch (AMQException e)
+        {
+            // TODO
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+    }
+}

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,402 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSessionDelegate extends SessionDelegate
+{
+    private final IApplicationRegistry _appRegistry;
+
+    public ServerSessionDelegate(IApplicationRegistry appRegistry)
+    {
+        _appRegistry = appRegistry;
+    }
+
+    @Override
+    public void messageAccept(Session session, MessageAccept method)
+    {
+        super.messageAccept(session, method);
+    }
+
+    @Override
+    public void messageReject(Session session, MessageReject method)
+    {
+        super.messageReject(session, method);
+    }
+
+    @Override
+    public void messageRelease(Session session, MessageRelease method)
+    {
+        super.messageRelease(session, method);
+    }
+
+    @Override
+    public void messageAcquire(Session session, MessageAcquire method)
+    {
+        super.messageAcquire(session, method);
+    }
+
+    @Override
+    public void messageResume(Session session, MessageResume method)
+    {
+        super.messageResume(session, method);
+    }
+
+    @Override
+    public void messageSubscribe(Session session, MessageSubscribe method)
+    {
+        super.messageSubscribe(session, method);
+    }
+
+
+    @Override
+    public void messageTransfer(Session ssn, MessageTransfer xfr)
+    {
+        ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+        Exchange exchange;
+        if(xfr.hasDestination())
+        {
+            exchange = exchangeRegistry.getExchange(xfr.getDestination());
+        }
+        else
+        {
+            exchange = exchangeRegistry.getDefaultExchange();
+        }
+
+        MessageTransferMessage message = new MessageTransferMessage(xfr);
+        try
+        {
+            ArrayList<AMQQueue> queues = exchange.route(message);
+
+            ((ServerSession) ssn).enqueue(message, queues);
+
+
+            System.out.println(queues);
+
+            ssn.processed(xfr);
+        }
+        catch (AMQException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
+
+
+        super.messageTransfer(ssn, xfr);    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public void messageCancel(Session session, MessageCancel method)
+    {
+        super.messageCancel(session, method);
+    }
+
+    @Override
+    public void messageFlush(Session session, MessageFlush method)
+    {
+        super.messageFlush(session, method);
+    }
+
+    @Override
+    public void txSelect(Session session, TxSelect method)
+    {
+        super.txSelect(session, method);
+    }
+
+    @Override
+    public void txCommit(Session session, TxCommit method)
+    {
+        super.txCommit(session, method);
+    }
+
+    @Override
+    public void txRollback(Session session, TxRollback method)
+    {
+        super.txRollback(session, method);
+    }
+
+    @Override
+    public void dtxSelect(Session session, DtxSelect method)
+    {
+        super.dtxSelect(session, method);
+    }
+
+    @Override
+    public void dtxStart(Session session, DtxStart method)
+    {
+        super.dtxStart(session, method);
+    }
+
+    @Override
+    public void dtxEnd(Session session, DtxEnd method)
+    {
+        super.dtxEnd(session, method);
+    }
+
+    @Override
+    public void dtxCommit(Session session, DtxCommit method)
+    {
+        super.dtxCommit(session, method);
+    }
+
+    @Override
+    public void dtxForget(Session session, DtxForget method)
+    {
+        super.dtxForget(session, method);
+    }
+
+    @Override
+    public void dtxGetTimeout(Session session, DtxGetTimeout method)
+    {
+        super.dtxGetTimeout(session, method);
+    }
+
+    @Override
+    public void dtxPrepare(Session session, DtxPrepare method)
+    {
+        super.dtxPrepare(session, method);
+    }
+
+    @Override
+    public void dtxRecover(Session session, DtxRecover method)
+    {
+        super.dtxRecover(session, method);
+    }
+
+    @Override
+    public void dtxRollback(Session session, DtxRollback method)
+    {
+        super.dtxRollback(session, method);
+    }
+
+    @Override
+    public void dtxSetTimeout(Session session, DtxSetTimeout method)
+    {
+        super.dtxSetTimeout(session, method);
+    }
+
+    @Override
+    public void exchangeDeclare(Session session, ExchangeDeclare method)
+    {
+        String exchangeName = method.getExchange();
+
+        Exchange exchange = getExchange(session, exchangeName);
+
+        if(method.getPassive())
+        {
+            if(exchange == null)
+            {
+                ExecutionException ex = new ExecutionException();
+                ex.setErrorCode(ExecutionErrorCode.NOT_FOUND);
+                ex.setCommandId(method.getId());
+
+                ex.setDescription("not-found: exchange-name '"+exchangeName+"'");
+
+                session.invoke(ex);
+                session.close();
+            }
+
+        }
+        else
+        {
+            // TODO
+        }
+        super.exchangeDeclare(session, method);
+    }
+
+    private Exchange getExchange(Session session, String exchangeName)
+    {
+        ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+        return exchangeRegistry.getExchange(exchangeName);
+    }
+
+    private ExchangeRegistry getExchangeRegistry(Session session)
+    {
+        VirtualHost virtualHost = getVirtualHost(session);
+        return virtualHost.getExchangeRegistry();
+
+    }
+
+    private VirtualHost getVirtualHost(Session session)
+    {
+        ServerConnection conn = getServerConnection(session);
+        VirtualHost vhost = conn.getVirtualHost();
+        return vhost;
+    }
+
+    private ServerConnection getServerConnection(Session session)
+    {
+        ServerConnection conn = (ServerConnection) session.getConnection();
+        return conn;
+    }
+
+    @Override
+    public void exchangeDelete(Session session, ExchangeDelete method)
+    {
+        super.exchangeDelete(session, method);
+    }
+
+    @Override
+    public void exchangeQuery(Session session, ExchangeQuery method)
+    {
+        super.exchangeQuery(session, method);
+
+    }
+
+    @Override
+    public void exchangeBind(Session session, ExchangeBind method)
+    {
+        super.exchangeBind(session, method);
+    }
+
+    @Override
+    public void exchangeUnbind(Session session, ExchangeUnbind method)
+    {
+        super.exchangeUnbind(session, method);
+    }
+
+    @Override
+    public void exchangeBound(Session session, ExchangeBound method)
+    {
+
+
+        ExchangeBoundResult result = new ExchangeBoundResult();
+        if(method.hasExchange())
+        {
+            Exchange exchange = getExchange(session, method.getExchange());
+
+            if(exchange == null)
+            {
+                result.setExchangeNotFound(true);
+            }
+
+            if(method.hasQueue())
+            {
+
+                AMQQueue queue = getQueue(session, method.getQueue());
+                if(queue == null)
+                {
+                    result.setQueueNotFound(true);
+                }
+
+                if(exchange != null && queue != null)
+                {
+
+                    if(method.hasBindingKey())
+                    {
+
+                        if(method.hasArguments())
+                        {
+                            // TODO
+                        }
+                        result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+
+                    }
+
+                    result.setQueueNotMatched(!exchange.isBound(queue));
+
+                }
+            }
+            else if(exchange != null && method.hasBindingKey())
+            {
+                if(method.hasArguments())
+                {
+                    // TODO
+                }
+                result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
+            }
+
+        }
+        else if(method.hasQueue())
+        {
+            AMQQueue queue = getQueue(session, method.getQueue());
+            if(queue == null)
+            {
+                result.setQueueNotFound(true);
+            }
+            else
+            {
+                if(method.hasBindingKey())
+                {
+                    if(method.hasArguments())
+                    {
+                        // TODO
+                    }
+
+                    // TODO
+                }
+            }
+
+        }
+
+
+        session.executionResult((int) method.getId(), result);
+        super.exchangeBound(session, method);
+    }
+
+    private AMQQueue getQueue(Session session, String queue)
+    {
+        QueueRegistry queueRegistry = getQueueRegistry(session);
+        return queueRegistry.getQueue(queue);
+    }
+
+    private QueueRegistry getQueueRegistry(Session session)
+    {
+        return getVirtualHost(session).getQueueRegistry();
+    }
+
+    @Override
+    public void queueDeclare(Session session, QueueDeclare method)
+    {
+        super.queueDeclare(session, method);
+    }
+
+    @Override
+    public void queueDelete(Session session, QueueDelete method)
+    {
+        super.queueDelete(session, method);
+    }
+
+    @Override
+    public void queuePurge(Session session, QueuePurge method)
+    {
+        super.queuePurge(session, method);
+    }
+
+    @Override
+    public void queueQuery(Session session, QueueQuery method)
+    {
+        super.queueQuery(session, method);
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message