activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: revisit https://issues.apache.org/jira/browse/AMQ-3519 with more durable solution https://issues.apache.org/jira/browse/AMQ-5068 - JMSRedelivered header now persisted before dispatch so that it is a reliable indication of a possible duplicate
Date Wed, 26 Mar 2014 11:16:57 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 751fc2363 -> 266d23ef7


revisit https://issues.apache.org/jira/browse/AMQ-3519 with more durable solution https://issues.apache.org/jira/browse/AMQ-5068 - JMSRedelivered header now persisted before dispatch so that it is a reliable indication of a possible duplicate delivery. The option is enabled via destination policy persistJMSRedelivered


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/266d23ef
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/266d23ef
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/266d23ef

Branch: refs/heads/trunk
Commit: 266d23ef794e79bb48f81314295640421772808c
Parents: 751fc23
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 26 11:14:35 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 26 11:16:42 2014 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |   8 +
 .../activemq/broker/region/QueueRegion.java     |   1 +
 .../activemq/broker/region/RegionBroker.java    |  11 ++
 .../broker/region/policy/PolicyEntry.java       |   9 ++
 .../activemq/store/AbstractMessageStore.java    |   4 +
 .../org/apache/activemq/store/MessageStore.java |   1 +
 .../activemq/store/ProxyMessageStore.java       |   5 +
 .../activemq/store/ProxyTopicMessageStore.java  |   4 +
 .../store/memory/MemoryMessageStore.java        |   6 +
 .../apache/activemq/store/jdbc/JDBCAdapter.java |   3 +
 .../activemq/store/jdbc/JDBCMessageStore.java   |  15 +-
 .../apache/activemq/store/jdbc/Statements.java  |   2 +-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  18 +++
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  12 --
 .../activemq/store/kahadb/KahaDBStore.java      |  68 ++++-----
 .../activemq/store/kahadb/MessageDatabase.java  |  73 +++++----
 .../apache/activemq/store/kahadb/Visitor.java   |   4 +
 .../src/main/proto/journal-data.proto           |   8 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |   7 +-
 .../java/org/apache/activemq/TestSupport.java   |  16 ++
 .../broker/BrokerRestartTestSupport.java        |   1 +
 .../broker/LevelDBRedeliveryRestartTest.java    |  49 ------
 .../activemq/broker/RedeliveryRestartTest.java  | 151 ++++++++++++++-----
 .../FailoverRedeliveryTransactionTest.java      |  23 +--
 .../activemq/usecases/MemoryLimitTest.java      |   2 +-
 25 files changed, 313 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 3f925b4..c3841c8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -106,6 +106,7 @@ public abstract class BaseDestination implements Destination {
      * percentage of in-flight messages above which optimize message store is disabled
      */
     private int optimizeMessageStoreInFlightLimit = 10;
+    private boolean persistJMSRedelivered;
 
     /**
      * @param brokerService
@@ -807,4 +808,11 @@ public abstract class BaseDestination implements Destination {
         }
     }
 
+    public void setPersistJMSRedelivered(boolean persistJMSRedelivered) {
+        this.persistJMSRedelivered = persistJMSRedelivered;
+    }
+
+    public boolean isPersistJMSRedelivered() {
+        return persistJMSRedelivered;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
index 05aa633..f300a13 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
@@ -25,6 +25,7 @@ import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 4d54753..dd0e63b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -596,6 +596,17 @@ public class RegionBroker extends EmptyBroker {
                 long totalTime = endTime - message.getBrokerInTime();
                 ((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
             }
+            if (((BaseDestination) message.getRegionDestination()).isPersistJMSRedelivered() && !message.isRedelivered() && message.isPersistent()) {
+                final int originalValue = message.getRedeliveryCounter();
+                message.incrementRedeliveryCounter();
+                try {
+                    ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
+                } catch (IOException error) {
+                    LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error);
+                } finally {
+                    message.setRedeliveryCounter(originalValue);
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 9e1b006..624d490 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
      * percentage of in-flight messages above which optimize message store is disabled
      */
     private int optimizeMessageStoreInFlightLimit = 10;
+    private boolean persistJMSRedelivered = false;
 
 
     public void configure(Broker broker,Queue queue) {
@@ -196,6 +197,7 @@ public class PolicyEntry extends DestinationMapEntry {
         destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
         destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
+        destination.setPersistJMSRedelivered(isPersistJMSRedelivered());
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -920,4 +922,11 @@ public class PolicyEntry extends DestinationMapEntry {
         this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
     }
 
+    public void setPersistJMSRedelivered(boolean val) {
+        this.persistJMSRedelivered = val;
+    }
+
+    public boolean isPersistJMSRedelivered() {
+        return persistJMSRedelivered;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index ba9efde..cd8d0f9 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -117,6 +117,10 @@ abstract public class AbstractMessageStore implements MessageStore {
         removeMessage(context, ack);
     }
 
+    public void updateMessage(Message message) throws IOException {
+        throw new IOException("update is not supported by: " + this);
+    }
+
     static class CallableImplementation implements Callable<Object> {
         public Object call() throws Exception {
             return null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index 6dac804..d465bc5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -195,4 +195,5 @@ public interface MessageStore extends Service {
      */
     public boolean isPrioritizedMessages();
 
+    void updateMessage(Message message) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index ce1dd06..e79229b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -155,4 +155,9 @@ public class ProxyMessageStore implements MessageStore {
     public boolean isPrioritizedMessages() {
         return delegate.isPrioritizedMessages();
     }
+
+    @Override
+    public void updateMessage(Message message) throws IOException {
+        delegate.updateMessage(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 8487650..c0635fa 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -204,4 +204,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
     public boolean isPrioritizedMessages() {
         return delegate.isPrioritizedMessages();
     }
+
+    public void updateMessage(Message message) throws IOException {
+        delegate.updateMessage(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 5ad6a32..836b388 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -152,5 +152,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
     public void setBatch(MessageId messageId) {
         lastBatchId = messageId;
     }
+
+    public void updateMessage(Message message) {
+        synchronized (messageTable) {
+            messageTable.put(message.getMessageId(), message);
+        }
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
index 19ba585..912808d 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.Set;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -107,4 +108,6 @@ public interface JDBCAdapter {
     void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException;
 
     void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException;
+
+    void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 0ee9823..3c441b0 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -138,6 +138,19 @@ public class JDBCMessageStore extends AbstractMessageStore {
         }
     }
 
+    @Override
+    public void updateMessage(Message message) throws IOException {
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message)));
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e);
+        } finally {
+            c.close();
+        }
+    }
+
     protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
         if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) {
             recoveredAdditions.add(sequenceId);
@@ -353,5 +366,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
 
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
-    }   
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index 5df6f2e..fc80465 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -169,7 +169,7 @@ public class Statements {
 
     public String getUpdateMessageStatement() {
         if (updateMessageStatement == null) {
-            updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?";
+            updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
         }
         return updateMessageStatement;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 3243048..dc59621 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -252,6 +252,24 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         }
     }
 
+    @Override
+    public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
+        PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
+            setBinaryData(s, 1, data);
+            s.setString(2, id.getProducerId().toString());
+            s.setLong(3, id.getProducerSequenceId());
+            s.setString(4, destination.getQualifiedName());
+            if (s.executeUpdate() != 1) {
+                throw new IOException("Could not update message: " + id + " in " + destination);
+            }
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(s);
+        }
+    }
 
 
     public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 9bfbd83..d8b986e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -555,18 +555,6 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
         letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
     }
 
-    /**
-     * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
-     * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
-     */
-    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
-        letter.setRewriteOnRedelivery(rewriteOnRedelivery);
-    }
-
-    public boolean isRewriteOnRedelivery() {
-        return letter.isRewriteOnRedelivery();
-    }
-
     public float getIndexLFUEvictionFactor() {
         return letter.getIndexLFUEvictionFactor();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 9255c00..1e84642 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -60,10 +60,12 @@ import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.usage.MemoryUsage;
@@ -277,46 +279,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     }
 
     @Override
-    void incrementRedeliveryAndReWrite(final String key, final KahaDestination destination) throws IOException {
-        Location location;
-        this.indexLock.writeLock().lock();
-        try {
-              location = findMessageLocation(key, destination);
-        } finally {
-            this.indexLock.writeLock().unlock();
-        }
-
-        if (location != null) {
-            KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
-            Message message = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
-
-            message.incrementRedeliveryCounter();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("rewriting: " + key + " with deliveryCount: " + message.getRedeliveryCounter());
-            }
-            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
-            addMessage.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-
-            final Location rewriteLocation = journal.write(toByteSequence(addMessage), true);
-
-            this.indexLock.writeLock().lock();
-            try {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    @Override
-                    public void execute(Transaction tx) throws IOException {
-                        StoredDestination sd = getStoredDestination(destination, tx);
-                        Long sequence = sd.messageIdIndex.get(tx, key);
-                        MessageKeys keys = sd.orderIndex.get(tx, sequence);
-                        sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), sequence, new MessageKeys(keys.messageId, rewriteLocation));
-                    }
-                });
-            } finally {
-                this.indexLock.writeLock().unlock();
-            }
-        }
-    }
-
-    @Override
     void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
         if (brokerService != null) {
             RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
@@ -465,6 +427,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
 
         }
 
+        public void updateMessage(Message message) throws IOException {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
+            }
+            KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand();
+            KahaAddMessageCommand command = new KahaAddMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(message.getMessageId().toProducerKey());
+            command.setPriority(message.getPriority());
+            command.setPrioritySupported(prioritizedMessages);
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
+            command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
+            updateMessageCommand.setMessage(command);
+            store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null);
+        }
+
         @Override
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@@ -1126,7 +1104,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
      * @throws IOException
      */
     Message loadMessage(Location location) throws IOException {
-        KahaAddMessageCommand addMessage = (KahaAddMessageCommand) load(location);
+        JournalCommand<?> command = load(location);
+        KahaAddMessageCommand addMessage = null;
+        switch (command.type()) {
+            case KAHA_UPDATE_MESSAGE_COMMAND:
+                addMessage = ((KahaUpdateMessageCommand)command).getMessage();
+                break;
+            default:
+                addMessage = (KahaAddMessageCommand) command;
+        }
         Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput()));
         return msg;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 78e26a9..df970d4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -69,6 +69,7 @@ import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@@ -1113,6 +1114,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             public void visit(KahaTraceCommand command) {
                 processLocation(location);
             }
+
+            @Override
+            public void visit(KahaUpdateMessageCommand command) throws IOException {
+                process(command, location);
+            }
         });
     }
 
@@ -1127,7 +1133,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     @Override
                     public void execute(Transaction tx) throws IOException {
-                        upadateIndex(tx, command, location);
+                        updateIndex(tx, command, location);
                     }
                 });
             } finally {
@@ -1137,6 +1143,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     }
 
     @SuppressWarnings("rawtypes")
+    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
+        this.indexLock.writeLock().lock();
+        try {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, command, location);
+                }
+            });
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@@ -1253,27 +1274,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 updates = preparedTransactions.remove(key);
             }
         }
-        if (isRewriteOnRedelivery()) {
-            persistRedeliveryCount(updates);
-        }
     }
 
-    @SuppressWarnings("rawtypes")
-    private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
-        if (updates != null) {
-            for (Operation operation : updates) {
-                operation.getCommand().visit(new Visitor() {
-                    @Override
-                    public void visit(KahaRemoveMessageCommand command) throws IOException {
-                        incrementRedeliveryAndReWrite(command.getMessageId(), command.getDestination());
-                    }
-                });
-            }
-        }
-    }
-
-   abstract void incrementRedeliveryAndReWrite(String key, KahaDestination destination) throws IOException;
-
     // /////////////////////////////////////////////////////////////////
     // These methods do the actual index updates.
     // /////////////////////////////////////////////////////////////////
@@ -1281,7 +1283,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
-    void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
+    void updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
 
         // Skip adding the message to the index if this is a topic and there are
@@ -1320,6 +1322,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         metadata.lastUpdate = location;
     }
 
+    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
+        KahaAddMessageCommand command = updateMessageCommand.getMessage();
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+
+        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
+        if (id != null) {
+            sd.orderIndex.put(
+                    tx,
+                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
+                    id,
+                    new MessageKeys(command.getMessageId(), location)
+            );
+            sd.locationIndex.put(tx, location, id);
+        } else {
+            LOG.warn("Non existent message update attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
+        }
+        metadata.lastUpdate = location;
+    }
+
     abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
 
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@@ -2382,7 +2403,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         @Override
         public void execute(Transaction tx) throws IOException {
-            upadateIndex(tx, command, location);
+            updateIndex(tx, command, location);
         }
 
     }
@@ -2612,14 +2633,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         this.directoryArchive = directoryArchive;
     }
 
-    public boolean isRewriteOnRedelivery() {
-        return rewriteOnRedelivery;
-    }
-
-    public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
-        this.rewriteOnRedelivery = rewriteOnRedelivery;
-    }
-
     public boolean isArchiveCorruptedIndex() {
         return archiveCorruptedIndex;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index 03072fe..be4f2ff 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -28,6 +28,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 
 public class Visitor {
 
@@ -60,4 +61,7 @@ public class Visitor {
 
     public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
     }
+
+    public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-kahadb-store/src/main/proto/journal-data.proto
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto
index 9d26308..8290c4c 100644
--- a/activemq-kahadb-store/src/main/proto/journal-data.proto
+++ b/activemq-kahadb-store/src/main/proto/journal-data.proto
@@ -31,6 +31,7 @@ enum KahaEntryType {
   KAHA_SUBSCRIPTION_COMMAND = 7;
   KAHA_PRODUCER_AUDIT_COMMAND = 8;
   KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
+  KAHA_UPDATE_MESSAGE_COMMAND = 10;
 }
 
 message KahaTraceCommand {
@@ -58,6 +59,13 @@ message KahaAddMessageCommand {
   optional bool prioritySupported = 6;
 }
 
+message KahaUpdateMessageCommand {
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaUpdateMessageCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+  required KahaAddMessageCommand message = 1;
+}
+
 message KahaRemoveMessageCommand {
   //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRemoveMessageCommand>";
   //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 7b90b0c..9256bb5 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -394,7 +394,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
           if( prepared ) {
             store.preparedAcks.remove(ack.getLastMessageId)
           }
-          uow.incrementRedelivery(store.key, ack.getLastMessageId)
         }
       }
     }
@@ -701,6 +700,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
       waitOn(asyncAddQueueMessage(context, message, delay))
     }
 
+    override def updateMessage(message: Message): Unit = {
+      check_running
+      // the only current usage of update is to increment the redelivery counter
+      withUow {uow => uow.incrementRedelivery(key, message.getMessageId)}
+    }
+
     def doRemove(uow: DelayableUOW, id: MessageId): CountDownFuture[AnyRef] = {
       uow.dequeue(key, id)
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 3fc7cf7..46eecfb 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -199,6 +199,22 @@ public abstract class TestSupport extends CombinationTestSupport {
         return adapter;
     }
 
+    public void stopBrokerWithStoreFailure(BrokerService broker, PersistenceAdapterChoice choice) throws Exception {
+        switch (choice) {
+            case KahaDB:
+                KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+                // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
+                kahaDBPersistenceAdapter.getStore().getJournal().close();
+                break;
+            default:
+                // just stop normally by default
+                broker.stop();
+        }
+        broker.waitUntilStopped();
+    }
+
+
     /**
      * Test if base directory contains spaces
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
index a25b071..c4e3848 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
@@ -59,6 +59,7 @@ public class BrokerRestartTestSupport extends BrokerTestSupport {
      */
     protected void restartBroker() throws Exception {
         broker.stop();
+        broker.waitUntilStopped();
         broker = createRestartedBroker();
         broker.start();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java
deleted file mode 100644
index decf4d4..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/LevelDBRedeliveryRestartTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker;
-
-import junit.framework.Test;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-import java.io.IOException;
-
-/**
- */
-public class LevelDBRedeliveryRestartTest extends RedeliveryRestartTest {
-    @Override
-    protected void configureBroker(BrokerService broker) throws Exception {
-        broker.setDestinationPolicy(policyMap);
-        LevelDBStore store = new LevelDBStore();
-        broker.setPersistenceAdapter(store);
-        broker.addConnector("tcp://0.0.0.0:0");
-    }
-
-    @Override
-    protected void stopBrokerWithStoreFailure() throws Exception {
-        broker.stop();
-        broker.waitUntilStopped();
-    }
-
-    public static Test suite() {
-        return suite(LevelDBRedeliveryRestartTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
index 922b67a..8eba729 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.util.Arrays;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -23,44 +24,123 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
-import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.transport.failover.FailoverTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-public class RedeliveryRestartTest extends BrokerRestartTestSupport {
+@RunWith(value = Parameterized.class)
+public class RedeliveryRestartTest extends TestSupport {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartTest.class);
+    ActiveMQConnection connection;
+    BrokerService broker = null;
+    String queueName = "redeliveryRestartQ";
+
+    @Parameterized.Parameter
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = PersistenceAdapterChoice.KahaDB;
+
+    @Parameterized.Parameters(name="Store={0}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB},{TestSupport.PersistenceAdapterChoice.JDBC},{TestSupport.PersistenceAdapterChoice.LevelDB}});
+    }
 
     @Override
-    protected void setUp() throws Exception {
-        setAutoFail(true);
-        setMaxTestTime(2 * 60 * 1000);
+    @Before
+    public void setUp() throws Exception {
         super.setUp();
-
+        broker = new BrokerService();
+        configureBroker(broker);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
     }
 
     @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
     protected void configureBroker(BrokerService broker) throws Exception {
-        super.configureBroker(broker);
-        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-        kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
-        kahaDBPersistenceAdapter.setCleanupInterval(500);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
         broker.addConnector("tcp://0.0.0.0:0");
     }
 
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception {
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(10, queueName, connection);
+
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = null;
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        consumer.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
+                .getPublishableConnectString());
+
+        consumer = session.createConsumer(destination);
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+            assertEquals("redelivery count survives restart", 2, msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    @org.junit.Test
     public void testValidateRedeliveryFlagAfterRestart() throws Exception {
 
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
-            + ")?jms.transactedIndividualAck=true");
-        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
         populateDestination(10, queueName, connection);
@@ -109,10 +189,11 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
         connection.close();
     }
 
+    @org.junit.Test
     public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
-            + "?jms.transactedIndividualAck=true");
-        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
         populateDestination(1, queueName, connection);
@@ -121,19 +202,20 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
         Destination destination = session.createQueue(queueName);
 
         MessageConsumer consumer = session.createConsumer(destination);
-        TextMessage msg = (TextMessage) consumer.receive(20000);
+        TextMessage msg = (TextMessage) consumer.receive(5000);
         LOG.info("got: " + msg);
         assertNotNull("got the message", msg);
         assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
         assertEquals("not a redelivery", false, msg.getJMSRedelivered());
 
-        stopBrokerWithStoreFailure();
+        stopBrokerWithStoreFailure(broker, persistenceAdapterChoice);
 
         broker = createRestartedBroker();
         broker.start();
 
-        connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
-            + "?jms.transactedIndividualAck=true");
+        connection.close();
+
+        connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
         connection = (ActiveMQConnection) connectionFactory.createConnection();
         connection.start();
 
@@ -148,12 +230,17 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
         connection.close();
     }
 
-    protected void stopBrokerWithStoreFailure() throws Exception {
-        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-
-        // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover
-        kahaDBPersistenceAdapter.getStore().getJournal().close();
+    private void restartBroker() throws Exception {
+        broker.stop();
         broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private BrokerService createRestartedBroker() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker);
+        return broker;
     }
 
     private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
@@ -166,12 +253,4 @@ public class RedeliveryRestartTest extends BrokerRestartTestSupport {
         producer.close();
         session.close();
     }
-
-    public static Test suite() {
-        return suite(RedeliveryRestartTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
index 842563d..c8d4c51 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
@@ -16,12 +16,11 @@
  */
 package org.apache.activemq.transport.failover;
 
-import java.io.IOException;
 import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 
 public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
 
@@ -38,22 +37,14 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
     @Override
     public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
         BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress);
-        configurePersistenceAdapter(brokerService);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
         return brokerService;
     }
 
-    private void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
-         KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
-         kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
-    }
-
-    @Override
-    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
-        PersistenceAdapter persistenceAdapter = super.setDefaultPersistenceAdapter(broker);
-        configurePersistenceAdapter(broker);
-        return persistenceAdapter;
-    }
-
     // no point rerunning these
     @Override
     public void testFailoverProducerCloseBeforeTransaction() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/266d23ef/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index de9e11c..21c39ef 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -50,7 +50,7 @@ public class MemoryLimitTest extends TestSupport {
     @Parameterized.Parameter
     public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
 
-    @Parameterized.Parameters(name="{0}")
+    @Parameterized.Parameters(name="store={0}")
     public static Iterable<Object[]> getTestParameters() {
         return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
     }


Mime
View raw message