activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5513 - interaction with - https://issues.apache.org/jira/browse/AMQ-5068 - need to ensure broker cached messages state reflects delivery attempt - RedeliveryRestartWithExceptionTest regressio
Date Mon, 26 Jan 2015 12:14:20 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk d5470254a -> 6cebd2c79


https://issues.apache.org/jira/browse/AMQ-5513 - interaction with - https://issues.apache.org/jira/browse/AMQ-5068
- need to ensure broker cached messages state reflects delivery attempt - RedeliveryRestartWithExceptionTest
regression


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6cebd2c7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6cebd2c7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6cebd2c7

Branch: refs/heads/trunk
Commit: 6cebd2c79e939ae45b4cd94b6433b9d96d2bae8d
Parents: d547025
Author: gtully <gary.tully@gmail.com>
Authored: Mon Jan 26 12:13:45 2015 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Jan 26 12:13:55 2015 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/RegionBroker.java    | 24 ++++++++++++++++++--
 .../RedeliveryRestartWithExceptionTest.java     |  4 ++--
 2 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6cebd2c7/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 658bc7c..2943c98 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -66,6 +66,7 @@ import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.TransmitCallback;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
@@ -613,8 +614,8 @@ public class RegionBroker extends EmptyBroker {
     }
 
     @Override
-    public void preProcessDispatch(MessageDispatch messageDispatch) {
-        Message message = messageDispatch.getMessage();
+    public void preProcessDispatch(final MessageDispatch messageDispatch) {
+        final Message message = messageDispatch.getMessage();
         if (message != null) {
             long endTime = System.currentTimeMillis();
             message.setBrokerOutTime(endTime);
@@ -627,6 +628,25 @@ public class RegionBroker extends EmptyBroker {
                 message.incrementRedeliveryCounter();
                 try {
                     ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
+                    messageDispatch.setTransmitCallback(new TransmitCallback() {
+                        // dispatch is considered a delivery, so update sub state post dispatch
otherwise
+                        // on a disconnect/reconnect cached messages will not reflect initial
delivery attempt
+                        final TransmitCallback delegate = messageDispatch.getTransmitCallback();
+                        @Override
+                        public void onSuccess() {
+                            message.incrementRedeliveryCounter();
+                            if (delegate != null) {
+                                delegate.onSuccess();
+                            }
+                        }
+
+                        @Override
+                        public void onFailure() {
+                            if (delegate != null) {
+                                delegate.onFailure();
+                            }
+                        }
+                    });
                 } catch (IOException error) {
                     RuntimeException runtimeException = new RuntimeException("Failed to persist
JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
                     LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cebd2c7/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
index 4126f06..eae86d6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -200,8 +200,8 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
             msg = (TextMessage) consumer.receive(4000);
             LOG.info("redelivered? got: " + msg);
             assertNotNull("got the message again", msg);
-            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
-            assertTrue("redelivery count survives reconnect", msg.getLongProperty("JMSXDeliveryCount")
> 1);
+            assertEquals("re delivery flag on:" + i, true, msg.getJMSRedelivered());
+            assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount")
> 1);
             msg.acknowledge();
         }
 


Mime
View raw message