activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1153125 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/
Date Tue, 02 Aug 2011 13:46:07 GMT
Author: dejanb
Date: Tue Aug  2 13:46:07 2011
New Revision: 1153125

URL: http://svn.apache.org/viewvc?rev=1153125&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3422 - producer audit needs string rollback, plus
don't fail if we cannot recover producer audit

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=1153125&r1=1153124&r2=1153125&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
Tue Aug  2 13:46:07 2011
@@ -189,6 +189,17 @@ public class ActiveMQMessageAuditNoSync 
             }
         }
     }
+
+    public void rollback(final String id) {
+        String seed = IdGenerator.getSeedFromId(id);
+        if (seed != null) {
+            BitArrayBin bab = map.get(seed);
+            if (bab != null) {
+                long index = IdGenerator.getSequenceFromId(id);
+                bab.setBit(index, false);
+            }
+        }
+    }
     
     /**
      * Check the message is in order

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1153125&r1=1153124&r2=1153125&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Aug  2 13:46:07 2011
@@ -510,12 +510,11 @@ public class MessageDatabase extends Ser
             try {
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
                 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
-            } catch (ClassNotFoundException cfe) {
-                IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
-                ioe.initCause(cfe);
-                throw ioe;
+                return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+            } catch (Exception e) {
+                LOG.warn("Cannot recover message audit", e);
+                return journal.getNextLocation(null);
             }
-            return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
         } else {
             // got no audit stored so got to recreate via replay from start of the journal
             return journal.getNextLocation(null);
@@ -546,7 +545,7 @@ public class MessageDatabase extends Ser
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 sd.locationIndex.remove(tx, keys.location);
                 sd.messageIdIndex.remove(tx, keys.messageId);
-                metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
+                metadata.producerSequenceIdTracker.rollback(keys.messageId);
                 undoCounter++;
                 // TODO: do we need to modify the ack positions for the pub sub case?
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java?rev=1153125&r1=1153124&r2=1153125&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IdGenerator.java Tue
Aug  2 13:46:07 2011
@@ -128,7 +128,7 @@ public class IdGenerator {
         if (id != null) {
             int index = id.lastIndexOf(':');
             if (index > 0 && (index + 1) < id.length()) {
-                result = id.substring(0, index + 1);
+                result = id.substring(0, index);
             }
         }
         return result;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java?rev=1153125&r1=1153124&r2=1153125&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
Tue Aug  2 13:46:07 2011
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import junit.framework.TestCase;
@@ -119,4 +123,64 @@ public class ActiveMQMessageAuditTest ex
             assertFalse(audit.isDuplicate(id));
         }
     }
+
+    public void testSerialization() throws Exception {
+        ActiveMQMessageAuditNoSync audit = new ActiveMQMessageAuditNoSync();
+
+        byte[] bytes =  serialize(audit);
+        System.out.println(bytes.length);
+        audit = recover(bytes);
+
+        List<MessageReference> list = new ArrayList<MessageReference>();
+
+
+        for (int j = 0; j < 1000; j++) {
+            ProducerId pid = new ProducerId();
+            pid.setConnectionId("test");
+            pid.setSessionId(0);
+            pid.setValue(j);
+            System.out.println("producer " + j);
+
+            for (int i = 0; i < 1000; i++) {
+                MessageId id = new MessageId();
+                id.setProducerId(pid);
+                id.setProducerSequenceId(i);
+                ActiveMQMessage msg = new ActiveMQMessage();
+                msg.setMessageId(id);
+                list.add(msg);
+                assertFalse(audit.isDuplicate(msg.getMessageId().toString()));
+
+                if (i % 100 == 0) {
+                    bytes = serialize(audit);
+                    System.out.println(bytes.length);
+                    audit = recover(bytes);
+                }
+
+                if (i % 250 == 0) {
+                    for (MessageReference message : list) {
+                        audit.rollback(message.getMessageId().toString());
+                    }
+                    list.clear();
+                    bytes = serialize(audit);
+                    System.out.println(bytes.length);
+                    audit = recover(bytes);
+                }
+
+            }
+        }
+
+    }
+
+    protected byte[] serialize(ActiveMQMessageAuditNoSync audit) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oout = new ObjectOutputStream(baos);
+        oout.writeObject(audit);
+        oout.flush();
+        return baos.toByteArray();
+    }
+
+    protected ActiveMQMessageAuditNoSync recover(byte[] bytes) throws Exception {
+        ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(bytes));
+        return (ActiveMQMessageAuditNoSync)objectIn.readObject();
+    }
 }



Mime
View raw message