activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4092 - resolve by removing dynamic property modification - JMSXGroupFirstForConsumer is now a message attribute with a property accessor
Date Tue, 17 Sep 2013 10:04:10 GMT
Updated Branches:
  refs/heads/trunk d771ebb97 -> dd91e8592


https://issues.apache.org/jira/browse/AMQ-4092 - resolve by removing dynamic property modification
- JMSXGroupFirstForConsumer is now a message attribute with a property accessor


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dd91e859
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dd91e859
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dd91e859

Branch: refs/heads/trunk
Commit: dd91e8592e6890a8863884250c7fe5ed1d8a85d7
Parents: d771ebb
Author: gtully <gary.tully@gmail.com>
Authored: Tue Sep 17 11:03:18 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Sep 17 11:03:57 2013 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/broker/region/Queue.java   |  9 +--------
 .../activemq/broker/region/QueueSubscription.java  | 17 -----------------
 .../java/org/apache/activemq/command/Message.java  | 13 +++++++++++++
 .../apache/activemq/filter/PropertyExpression.java |  7 +++++++
 .../activemq/openwire/v10/MessageMarshaller.java   |  5 +++++
 5 files changed, 26 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dd91e859/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 27ea50d..53f686f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2109,14 +2109,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
     protected void assignGroup(Subscription subs, MessageGroupMap messageGroupOwners, MessageReference
n, String groupId) throws IOException {
         messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
         Message message = n.getMessage();
-        if (message instanceof ActiveMQMessage) {
-            ActiveMQMessage activeMessage = (ActiveMQMessage) message;
-            try {
-                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
-            } catch (JMSException e) {
-                LOG.warn("Failed to set boolean header", e);
-            }
-        }
+        message.setJMSXGroupFirstForConsumer(true);
         subs.getConsumerInfo().setLastDeliveredSequenceId(subs.getConsumerInfo().getLastDeliveredSequenceId()
+ 1);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd91e859/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index e77714f..7c7027f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -74,23 +74,6 @@ public class QueueSubscription extends PrefetchSubscription implements
LockOwner
         return result;
     }
 
-    /**
-     * Assigns the message group to this subscription and set the flag on the
-     * message that it is the first message to be dispatched.
-     */
-    protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n,
String groupId) throws IOException {
-        messageGroupOwners.put(groupId, info.getConsumerId());
-        Message message = n.getMessage();
-        if (message instanceof ActiveMQMessage) {
-            ActiveMQMessage activeMessage = (ActiveMQMessage)message;
-            try {
-                activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
-            } catch (JMSException e) {
-                LOG.warn("Failed to set boolean header: " + e, e);
-            }
-        }
-    }
-
     @Override
     public synchronized String toString() {
         return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
+ destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd91e859/activemq-client/src/main/java/org/apache/activemq/command/Message.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index b3df690..e0f0b21 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -87,6 +87,7 @@ public abstract class Message extends BaseCommand implements MarshallAware,
Mess
     protected boolean readOnlyBody;
     protected transient boolean recievedByDFBridge;
     protected boolean droppable;
+    protected boolean jmsXGroupFirstForConsumer;
 
     private transient short referenceCount;
     private transient ActiveMQConnection connection;
@@ -156,6 +157,7 @@ public abstract class Message extends BaseCommand implements MarshallAware,
Mess
         copy.brokerOutTime = brokerOutTime;
         copy.memoryUsage=this.memoryUsage;
         copy.brokerPath = brokerPath;
+        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
 
         // lets not copy the following fields
         // copy.targetConsumerId = targetConsumerId;
@@ -781,6 +783,17 @@ public abstract class Message extends BaseCommand implements MarshallAware,
Mess
         return false;
     }
 
+    /**
+     * @openwire:property version=10
+     */
+    public boolean isJMSXGroupFirstForConsumer() {
+        return jmsXGroupFirstForConsumer;
+    }
+
+    public void setJMSXGroupFirstForConsumer(boolean val) {
+        jmsXGroupFirstForConsumer = val;
+    }
+
     public void compress() throws IOException {
         if (!isCompressed()) {
             storeContent();

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd91e859/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java
b/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java
index c797319..964a81d 100755
--- a/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java
+++ b/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java
@@ -195,6 +195,13 @@ public class PropertyExpression implements Expression {
                 return Arrays.toString(message.getBrokerPath());
             }
         });
+        JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupFirstForConsumer", new SubExpression() {
+
+            @Override
+            public Object evaluate(Message message) {
+                return Boolean.valueOf(message.isJMSXGroupFirstForConsumer());
+            }
+        });
     }
 
     private final String name;

http://git-wip-us.apache.org/repos/asf/activemq/blob/dd91e859/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java
b/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java
index 95fda28..262faff 100644
--- a/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java
+++ b/activemq-client/src/main/java/org/apache/activemq/openwire/v10/MessageMarshaller.java
@@ -105,6 +105,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller
{
         }
         info.setBrokerInTime(tightUnmarshalLong(wireFormat, dataIn, bs));
         info.setBrokerOutTime(tightUnmarshalLong(wireFormat, dataIn, bs));
+        info.setJMSXGroupFirstForConsumer(bs.readBoolean());
 
         info.afterUnmarshall(wireFormat);
 
@@ -147,6 +148,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller
{
         rc += tightMarshalObjectArray1(wireFormat, info.getCluster(), bs);
         rc+=tightMarshalLong1(wireFormat, info.getBrokerInTime(), bs);
         rc+=tightMarshalLong1(wireFormat, info.getBrokerOutTime(), bs);
+        bs.writeBoolean(info.isJMSXGroupFirstForConsumer());
 
         return rc + 9;
     }
@@ -191,6 +193,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller
{
         tightMarshalObjectArray2(wireFormat, info.getCluster(), dataOut, bs);
         tightMarshalLong2(wireFormat, info.getBrokerInTime(), dataOut, bs);
         tightMarshalLong2(wireFormat, info.getBrokerOutTime(), dataOut, bs);
+        bs.readBoolean();
 
         info.afterMarshall(wireFormat);
 
@@ -261,6 +264,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller
{
         }
         info.setBrokerInTime(looseUnmarshalLong(wireFormat, dataIn));
         info.setBrokerOutTime(looseUnmarshalLong(wireFormat, dataIn));
+        info.setJMSXGroupFirstForConsumer(dataIn.readBoolean());
 
         info.afterUnmarshall(wireFormat);
 
@@ -306,6 +310,7 @@ public abstract class MessageMarshaller extends BaseCommandMarshaller
{
         looseMarshalObjectArray(wireFormat, info.getCluster(), dataOut);
         looseMarshalLong(wireFormat, info.getBrokerInTime(), dataOut);
         looseMarshalLong(wireFormat, info.getBrokerOutTime(), dataOut);
+        dataOut.writeBoolean(info.isJMSXGroupFirstForConsumer());
 
     }
 }


Mime
View raw message