qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1772532 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/main/jav...
Date Sun, 04 Dec 2016 13:20:58 GMT
Author: rgodfrey
Date: Sun Dec  4 13:20:58 2016
New Revision: 1772532

URL: http://svn.apache.org/viewvc?rev=1772532&view=rev
Log:
QPID-7568 : Update support for delayed delivery in AMQP 1.0

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Sun Dec  4 13:20:58 2016
@@ -461,6 +461,7 @@ public abstract class AbstractExchange<T
         }
 
         List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
+
         if(queues == null || queues.isEmpty())
         {
             Exchange altExchange = getAlternateExchange();
@@ -475,6 +476,13 @@ public abstract class AbstractExchange<T
         }
         else
         {
+            for(BaseQueue q : queues)
+            {
+                if(!message.isResourceAcceptable(q))
+                {
+                    return 0;
+                }
+            }
             final BaseQueue[] baseQueues;
 
             if(message.isReferenced())

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
Sun Dec  4 13:20:58 2016
@@ -49,4 +49,6 @@ public interface ServerMessage<T extends
     long getArrivalTime();
 
     Object getConnectionReference();
+
+    boolean isResourceAcceptable(TransactionLogResource resource);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
Sun Dec  4 13:20:58 2016
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.Ab
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.util.ByteBufferInputStream;
 import org.apache.qpid.util.ByteBufferUtils;
@@ -104,6 +105,12 @@ public class InternalMessage extends Abs
         return _header.getArrivalTime();
     }
 
+    @Override
+    public boolean isResourceAcceptable(final TransactionLogResource resource)
+    {
+        return true;
+    }
+
     public Object getMessageBody()
     {
         return _messageBody;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Sun Dec  4 13:20:58 2016
@@ -2604,8 +2604,7 @@ public abstract class AbstractQueue<X ex
         {
             throw new VirtualHostUnavailableException(this._virtualHost);
         }
-
-        if(!message.isReferenced(this))
+        if(message.isResourceAcceptable(this) && !message.isReferenced(this))
         {
             txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
             {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
Sun Dec  4 13:20:58 2016
@@ -112,6 +112,12 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
+        public boolean isResourceAcceptable(final TransactionLogResource resource)
+        {
+            return true;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
Sun Dec  4 13:20:58 2016
@@ -121,6 +121,12 @@ class MockServerMessage implements Serve
     }
 
     @Override
+    public boolean isResourceAcceptable(final TransactionLogResource resource)
+    {
+        return true;
+    }
+
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
Sun Dec  4 13:20:58 2016
@@ -26,6 +26,7 @@ import org.apache.qpid.bytebuffer.QpidBy
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.transport.Header;
 
 
@@ -72,6 +73,12 @@ public class MessageTransferMessage exte
         return getMetaData().getArrivalTime();
     }
 
+    @Override
+    public boolean isResourceAcceptable(final TransactionLogResource resource)
+    {
+        return true;
+    }
+
     public Header getHeader()
     {
         return getMetaData().getHeader();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
Sun Dec  4 13:20:58 2016
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.MessagePu
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 /**
  * A deliverable message.
@@ -86,6 +87,12 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getArrivalTime();
     }
 
+    @Override
+    public boolean isResourceAcceptable(final TransactionLogResource resource)
+    {
+        return true;
+    }
+
     public boolean isImmediate()
     {
         return getMessagePublishInfo().isImmediate();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Sun Dec  4 13:20:58 2016
@@ -215,8 +215,9 @@ public class ExchangeDestination impleme
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[1];
+        Symbol[] capabilities = new Symbol[2];
         capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+        capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
Sun Dec  4 13:20:58 2016
@@ -59,10 +59,13 @@ import org.apache.qpid.server.util.Conne
 public class MessageMetaData_1_0 implements StorableMessageMetaData
 {
     private static final Logger _logger = LoggerFactory.getLogger(MessageMetaData_1_0.class);
-    // TODO move to somewhere more useful
-    public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
-    public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY =
new MetaDataFactory();
     private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();
+    public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY =
new MetaDataFactory();
+
+    // TODO move to somewhere more useful
+    private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+    private static final Symbol DELIVERY_TIME = Symbol.valueOf("x-opt-delivery-time");
+    private static final Symbol NOT_VALID_BEFORE = Symbol.valueOf("x-qpid-not-valid-before");
 
 
     private Header _header;
@@ -529,7 +532,12 @@ public class MessageMetaData_1_0 impleme
         {
             long notValidBefore;
             Object annotation;
-            if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before")))
instanceof Number)
+
+            if(_messageAnnotations != null && (annotation = _messageAnnotations.get(DELIVERY_TIME))
instanceof Number)
+            {
+                notValidBefore = ((Number)annotation).longValue();
+            }
+            else if(_messageAnnotations != null && (annotation = _messageAnnotations.get(NOT_VALID_BEFORE))
instanceof Number)
             {
                 notValidBefore = ((Number)annotation).longValue();
             }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Sun Dec  4 13:20:58 2016
@@ -24,12 +24,14 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Section;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
@@ -96,6 +98,19 @@ public class Message_1_0 extends Abstrac
         return _arrivalTime;
     }
 
+
+    @Override
+    public boolean isResourceAcceptable(final TransactionLogResource resource)
+    {
+        return getMessageHeader().getNotValidBefore() != 0L && !resourceSupportsDeliveryDelay(resource);
+    }
+
+    private boolean resourceSupportsDeliveryDelay(final TransactionLogResource resource)
+    {
+        return resource instanceof Queue && ((Queue<?>)resource).isHoldOnPublishEnabled();
+    }
+
+
     public Collection<QpidByteBuffer> getFragments()
     {
         return getContent(0, (int) getSize());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
Sun Dec  4 13:20:58 2016
@@ -192,8 +192,9 @@ public class NodeReceivingDestination im
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[1];
+        Symbol[] capabilities = new Symbol[2];
         capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+        capabilities[1] = DELAYED_DELIVERY;
         return capabilities;
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
Sun Dec  4 13:20:58 2016
@@ -20,11 +20,15 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -112,4 +116,15 @@ public class QueueDestination extends Me
     {
         return _address;
     }
+
+    @Override
+    public Symbol[] getCapabilities()
+    {
+        Set<Symbol> capabilities = new HashSet<>(Arrays.asList(super.getCapabilities()));
+        if(_queue.isHoldOnPublishEnabled())
+        {
+            capabilities.add(DELAYED_DELIVERY);
+        }
+        return capabilities.toArray(new Symbol[capabilities.size()]);
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
Sun Dec  4 13:20:58 2016
@@ -31,6 +31,8 @@ public interface ReceivingDestination ex
 
     Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
     Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
+    Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+
 
     Outcome[] getOutcomes();
 

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
(original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
Sun Dec  4 13:20:58 2016
@@ -332,31 +332,37 @@ class ManagementNode implements MessageS
                                                                                   final ServerTransaction
txn,
                                                                                   final Action<?
super MessageInstance> postEnqueueAction)
     {
-
-        @SuppressWarnings("unchecked")
-        MessageConverter<M, InternalMessage> converter =
-                MessageConverterRegistry.getConverter(((Class<M>)message.getClass()),
InternalMessage.class);
+        if(message.isResourceAcceptable(this))
+        {
+            @SuppressWarnings("unchecked")
+            MessageConverter<M, InternalMessage> converter =
+                    MessageConverterRegistry.getConverter(((Class<M>) message.getClass()),
InternalMessage.class);
 
 
-        if(converter != null)
-        {
-            final InternalMessage msg = converter.convert(message, _addressSpace);
-            txn.addPostTransactionAction(new ServerTransaction.Action()
+            if (converter != null)
             {
-                @Override
-                public void postCommit()
-                {
-                    enqueue(msg, instanceProperties, postEnqueueAction);
-                }
-
-                @Override
-                public void onRollback()
+                final InternalMessage msg = converter.convert(message, _addressSpace);
+                txn.addPostTransactionAction(new ServerTransaction.Action()
                 {
-
-                }
-            });
-
-            return 1;
+                    @Override
+                    public void postCommit()
+                    {
+                        enqueue(msg, instanceProperties, postEnqueueAction);
+                    }
+
+                    @Override
+                    public void onRollback()
+                    {
+
+                    }
+                });
+
+                return 1;
+            }
+            else
+            {
+                return 0;
+            }
         }
         else
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message