activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1416989 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Date Tue, 04 Dec 2012 15:25:56 GMT
Author: tabish
Date: Tue Dec  4 15:25:54 2012
New Revision: 1416989

URL: http://svn.apache.org/viewvc?rev=1416989&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3833

Allows the store to detect changes in the Audit and only write if its dirty.

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=1416989&r1=1416988&r2=1416989&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
Tue Dec  4 15:25:54 2012
@@ -30,8 +30,8 @@ import org.apache.activemq.util.LRUCache
 
 /**
  * Provides basic audit functions for Messages without sync
- * 
- * 
+ *
+ *
  */
 public class ActiveMQMessageAuditNoSync implements Serializable {
 
@@ -41,11 +41,11 @@ public class ActiveMQMessageAuditNoSync 
     public static final int MAXIMUM_PRODUCER_COUNT = 64;
     private int auditDepth;
     private int maximumNumberOfProducersToTrack;
-    private LRUCache<Object, BitArrayBin> map;
+    private final LRUCache<Object, BitArrayBin> map;
+    private transient boolean modified = true;
 
     /**
-     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
-     * 64
+     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
      */
     public ActiveMQMessageAuditNoSync() {
         this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
@@ -53,17 +53,16 @@ public class ActiveMQMessageAuditNoSync 
 
     /**
      * Construct a MessageAudit
-     * 
+     *
      * @param auditDepth range of ids to track
-     * @param maximumNumberOfProducersToTrack number of producers expected in
-     *                the system
+     * @param maximumNumberOfProducersToTrack number of producers expected in the system
      */
     public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack)
{
         this.auditDepth = auditDepth;
         this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
         this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack,
0.75f, true);
     }
-    
+
     /**
      * @return the auditDepth
      */
@@ -76,6 +75,7 @@ public class ActiveMQMessageAuditNoSync 
      */
     public void setAuditDepth(int auditDepth) {
         this.auditDepth = auditDepth;
+        this.modified = true;
     }
 
     /**
@@ -88,15 +88,15 @@ public class ActiveMQMessageAuditNoSync 
     /**
      * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
      */
-    public void setMaximumNumberOfProducersToTrack(
-            int maximumNumberOfProducersToTrack) {
+    public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) {
         this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
         this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
+        this.modified = true;
     }
 
     /**
      * Checks if this message has been seen before
-     * 
+     *
      * @param message
      * @return true if the message is a duplicate
      * @throws JMSException
@@ -108,7 +108,7 @@ public class ActiveMQMessageAuditNoSync 
     /**
      * checks whether this messageId has been seen before and adds this
      * messageId to the list
-     * 
+     *
      * @param id
      * @return true if the message is a duplicate
      */
@@ -120,10 +120,12 @@ public class ActiveMQMessageAuditNoSync 
             if (bab == null) {
                 bab = new BitArrayBin(auditDepth);
                 map.put(seed, bab);
+                modified = true;
             }
             long index = IdGenerator.getSequenceFromId(id);
             if (index >= 0) {
                 answer = bab.setBit(index, true);
+                modified = true;
             }
         }
         return answer;
@@ -131,7 +133,7 @@ public class ActiveMQMessageAuditNoSync 
 
     /**
      * Checks if this message has been seen before
-     * 
+     *
      * @param message
      * @return true if the message is a duplicate
      */
@@ -139,16 +141,16 @@ public class ActiveMQMessageAuditNoSync 
         MessageId id = message.getMessageId();
         return isDuplicate(id);
     }
-    
+
     /**
      * Checks if this messageId has been seen before
-     * 
+     *
      * @param id
      * @return true if the message is a duplicate
      */
     public boolean isDuplicate(final MessageId id) {
         boolean answer = false;
-        
+
         if (id != null) {
             ProducerId pid = id.getProducerId();
             if (pid != null) {
@@ -156,6 +158,7 @@ public class ActiveMQMessageAuditNoSync 
                 if (bab == null) {
                     bab = new BitArrayBin(auditDepth);
                     map.put(pid, bab);
+                    modified = true;
                 }
                 answer = bab.setBit(id.getProducerSequenceId(), true);
             }
@@ -165,17 +168,17 @@ public class ActiveMQMessageAuditNoSync 
 
     /**
      * mark this message as being received
-     * 
+     *
      * @param message
      */
     public void rollback(final MessageReference message) {
         MessageId id = message.getMessageId();
         rollback(id);
     }
-    
+
     /**
      * mark this message as being received
-     * 
+     *
      * @param id
      */
     public void rollback(final  MessageId id) {
@@ -185,6 +188,7 @@ public class ActiveMQMessageAuditNoSync 
                 BitArrayBin bab = map.get(pid);
                 if (bab != null) {
                     bab.setBit(id.getProducerSequenceId(), false);
+                    modified = true;
                 }
             }
         }
@@ -197,10 +201,11 @@ public class ActiveMQMessageAuditNoSync 
             if (bab != null) {
                 long index = IdGenerator.getSequenceFromId(id);
                 bab.setBit(index, false);
+                modified = true;
             }
         }
     }
-    
+
     /**
      * Check the message is in order
      * @param msg
@@ -210,7 +215,7 @@ public class ActiveMQMessageAuditNoSync 
     public boolean isInOrder(Message msg) throws JMSException {
         return isInOrder(msg.getJMSMessageID());
     }
-    
+
     /**
      * Check the message id is in order
      * @param id
@@ -218,7 +223,7 @@ public class ActiveMQMessageAuditNoSync 
      */
     public boolean isInOrder(final String id) {
         boolean answer = true;
-        
+
         if (id != null) {
             String seed = IdGenerator.getSeedFromId(id);
             if (seed != null) {
@@ -226,22 +231,22 @@ public class ActiveMQMessageAuditNoSync 
                 if (bab != null) {
                     long index = IdGenerator.getSequenceFromId(id);
                     answer = bab.isInOrder(index);
+                    modified = true;
                 }
-               
             }
         }
         return answer;
     }
-    
+
     /**
      * Check the MessageId is in order
-     * @param message 
+     * @param message
      * @return
      */
     public boolean isInOrder(final MessageReference message) {
         return isInOrder(message.getMessageId());
     }
-    
+
     /**
      * Check the MessageId is in order
      * @param id
@@ -257,6 +262,7 @@ public class ActiveMQMessageAuditNoSync 
                 if (bab == null) {
                     bab = new BitArrayBin(auditDepth);
                     map.put(pid, bab);
+                    modified = true;
                 }
                 answer = bab.isInOrder(id.getProducerSequenceId());
 
@@ -277,4 +283,36 @@ public class ActiveMQMessageAuditNoSync 
     public void clear() {
         map.clear();
     }
+
+    /**
+     * Returns if the Audit has been modified since last check, this method does not
+     * reset the modified flag.  If the caller needs to reset the flag in order to avoid
+     * serializing an unchanged Audit then its up the them to reset it themselves.
+     *
+     * @return true if the Audit has been modified.
+     */
+    public boolean isModified() {
+        return this.modified;
+    }
+
+    public void setModified(boolean modified) {
+        this.modified = modified;
+    }
+
+    /**
+     * Reads and returns the current modified state of the Audit, once called the state is
+     * reset to false.  This method is useful for code the needs to know if it should write
+     * out the Audit or otherwise execute some logic based on the Audit having changed since
+     * last check.
+     *
+     * @return true if the Audit has been modified since last check.
+     */
+    public boolean modified() {
+        if (this.modified) {
+            this.modified = false;
+            return true;
+        }
+
+        return false;
+    }
 }

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1416989&r1=1416988&r2=1416989&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Dec  4 15:25:54 2012
@@ -68,10 +68,6 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.util.Callback;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@@ -81,17 +77,21 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.disk.page.Page;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
-import org.apache.activemq.util.LockFile;
 import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.Marshaller;
 import org.apache.activemq.store.kahadb.disk.util.Sequence;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.LockFile;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -182,12 +182,14 @@ public abstract class MessageDatabase ex
     }
 
     class MetadataMarshaller extends VariableMarshaller<Metadata> {
+        @Override
         public Metadata readPayload(DataInput dataIn) throws IOException {
             Metadata rc = new Metadata();
             rc.read(dataIn);
             return rc;
         }
 
+        @Override
         public void writePayload(Metadata object, DataOutput dataOut) throws IOException
{
             object.write(dataOut);
         }
@@ -250,6 +252,7 @@ public abstract class MessageDatabase ex
             final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     if (pageFile.getPageCount() == 0) {
                         // First time this is created.. Initialize the metadata
@@ -275,6 +278,7 @@ public abstract class MessageDatabase ex
             // Perhaps we should just keep an index of file
             storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     for (Iterator<Entry<String, StoredDestination>> iterator
= metadata.destinations.iterator(tx); iterator.hasNext();) {
                         Entry<String, StoredDestination> entry = iterator.next();
@@ -394,6 +398,7 @@ public abstract class MessageDatabase ex
             try {
                 if (metadata.page != null) {
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        @Override
                         public void execute(Transaction tx) throws IOException {
                             checkpointUpdate(tx, true);
                         }
@@ -422,6 +427,7 @@ public abstract class MessageDatabase ex
 
                 if (metadata.page != null) {
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        @Override
                         public void execute(Transaction tx) throws IOException {
                             tx.store(metadata.page, metadataMarshaller, true);
                         }
@@ -501,6 +507,7 @@ public abstract class MessageDatabase ex
 
             // We may have to undo some index updates.
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     recoverIndex(tx);
                 }
@@ -613,6 +620,7 @@ public abstract class MessageDatabase ex
             sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                 int last=-1;
 
+                @Override
                 public boolean isInterestedInKeysBetween(Location first, Location second)
{
                     if( first==null ) {
                         return !ss.contains(0, second.getDataFileId());
@@ -623,6 +631,7 @@ public abstract class MessageDatabase ex
                     }
                 }
 
+                @Override
                 public void visit(List<Location> keys, List<Long> values) {
                     for (Location l : keys) {
                         int fileId = l.getDataFileId();
@@ -765,6 +774,7 @@ public abstract class MessageDatabase ex
                 return;
             }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, cleanup);
                 }
@@ -785,6 +795,7 @@ public abstract class MessageDatabase ex
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, false);
                 }
@@ -907,6 +918,7 @@ public abstract class MessageDatabase ex
         } else {
             // just recover producer audit
             data.visit(new Visitor() {
+                @Override
                 public void visit(KahaAddMessageCommand command) throws IOException {
                     metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
                 }
@@ -978,6 +990,7 @@ public abstract class MessageDatabase ex
             this.indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         upadateIndex(tx, command, location);
                     }
@@ -997,6 +1010,7 @@ public abstract class MessageDatabase ex
             this.indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
                     public void execute(Transaction tx) throws IOException {
                         updateIndex(tx, command, location);
                     }
@@ -1011,6 +1025,7 @@ public abstract class MessageDatabase ex
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
                 }
@@ -1024,6 +1039,7 @@ public abstract class MessageDatabase ex
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
                 }
@@ -1073,6 +1089,7 @@ public abstract class MessageDatabase ex
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
                 public void execute(Transaction tx) throws IOException {
                     for (Operation op : messagingTx) {
                         op.execute(tx);
@@ -1360,6 +1377,7 @@ public abstract class MessageDatabase ex
                 // Use a visitor to cut down the number of pages that we load
                 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>()
{
                     int last=-1;
+                    @Override
                     public boolean isInterestedInKeysBetween(Location first, Location second)
{
                         if( first==null ) {
                             SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
@@ -1385,6 +1403,7 @@ public abstract class MessageDatabase ex
                         }
                     }
 
+                    @Override
                     public void visit(List<Location> keys, List<Long> values)
{
                         for (Location l : keys) {
                             int fileId = l.getDataFileId();
@@ -1445,19 +1464,22 @@ public abstract class MessageDatabase ex
         }
     };
     private Location checkpointProducerAudit() throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oout = new ObjectOutputStream(baos);
-        oout.writeObject(metadata.producerSequenceIdTracker);
-        oout.flush();
-        oout.close();
-        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs
= false
-        Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())),
nullCompletionCallback);
-        try {
-            location.getLatch().await();
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException(e.toString());
+        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified())
{
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oout = new ObjectOutputStream(baos);
+            oout.writeObject(metadata.producerSequenceIdTracker);
+            oout.flush();
+            oout.close();
+            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs
= false
+            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())),
nullCompletionCallback);
+            try {
+                location.getLatch().await();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException(e.toString());
+            }
+            return location;
         }
-        return location;
+        return metadata.producerSequenceIdTrackerLocation;
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
@@ -1495,10 +1517,12 @@ public abstract class MessageDatabase ex
     static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys>
{
         static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
 
+        @Override
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
             return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
         }
 
+        @Override
         public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException
{
             dataOut.writeUTF(object.messageId);
             LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
@@ -1528,6 +1552,7 @@ public abstract class MessageDatabase ex
             this.priority = priority;
         }
 
+        @Override
         public String toString() {
             return "[" + lastAckedSequence + ":" + priority + "]";
         }
@@ -1535,11 +1560,13 @@ public abstract class MessageDatabase ex
 
     protected class LastAckMarshaller implements Marshaller<LastAck> {
 
+        @Override
         public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
             dataOut.writeLong(object.lastAckedSequence);
             dataOut.writeByte(object.priority);
         }
 
+        @Override
         public LastAck readPayload(DataInput dataIn) throws IOException {
             LastAck lastAcked = new LastAck();
             lastAcked.lastAckedSequence = dataIn.readLong();
@@ -1549,14 +1576,17 @@ public abstract class MessageDatabase ex
             return lastAcked;
         }
 
+        @Override
         public int getFixedSize() {
             return 9;
         }
 
+        @Override
         public LastAck deepCopy(LastAck source) {
             return new LastAck(source);
         }
 
+        @Override
         public boolean isDeepCopySupported() {
             return true;
         }
@@ -1581,6 +1611,7 @@ public abstract class MessageDatabase ex
 
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination>
{
 
+        @Override
         public StoredDestination readPayload(final DataInput dataIn) throws IOException {
             final StoredDestination value = new StoredDestination();
             value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
dataIn.readLong());
@@ -1595,6 +1626,7 @@ public abstract class MessageDatabase ex
                 } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        @Override
                         public void execute(Transaction tx) throws IOException {
                             BTreeIndex<Long, HashSet<String>> oldAckPositions
=
                                 new BTreeIndex<Long, HashSet<String>>(pageFile,
dataIn.readLong());
@@ -1638,6 +1670,7 @@ public abstract class MessageDatabase ex
             } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        @Override
                         public void execute(Transaction tx) throws IOException {
                             value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile,
tx.allocate());
                             value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
@@ -1655,6 +1688,7 @@ public abstract class MessageDatabase ex
             return value;
         }
 
+        @Override
         public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException
{
             dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
             dataOut.writeLong(value.locationIndex.getPageId());
@@ -1675,12 +1709,14 @@ public abstract class MessageDatabase ex
     static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand>
{
         final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
 
+        @Override
         public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
             KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
             rc.mergeFramed((InputStream)dataIn);
             return rc;
         }
 
+        @Override
         public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws
IOException {
             object.writeFramed((OutputStream)dataOut);
         }
@@ -2275,6 +2311,7 @@ public abstract class MessageDatabase ex
         this.checksumJournalFiles = checksumJournalFiles;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
@@ -2408,6 +2445,7 @@ public abstract class MessageDatabase ex
             }
         }
 
+        @Override
         public String toString() {
            return "MessageOrderCursor:[def:" + defaultCursorPosition
                    + ", low:" + lowPriorityCursorPosition
@@ -2664,6 +2702,7 @@ public abstract class MessageDatabase ex
                 }
             }
 
+            @Override
             public boolean hasNext() {
                 if (currentIterator == null) {
                     if (highIterator != null) {
@@ -2712,6 +2751,7 @@ public abstract class MessageDatabase ex
                 return currentIterator.hasNext();
             }
 
+            @Override
             public Entry<Long, MessageKeys> next() {
                 Entry<Long, MessageKeys> result = currentIterator.next();
                 if (result != null) {
@@ -2731,6 +2771,7 @@ public abstract class MessageDatabase ex
                 return result;
             }
 
+            @Override
             public void remove() {
                 throw new UnsupportedOperationException();
             }
@@ -2741,6 +2782,7 @@ public abstract class MessageDatabase ex
     private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>>
{
         final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
 
+        @Override
         public void writePayload(HashSet<String> object, DataOutput dataOut) throws
IOException {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             ObjectOutputStream oout = new ObjectOutputStream(baos);
@@ -2752,6 +2794,7 @@ public abstract class MessageDatabase ex
             dataOut.write(data);
         }
 
+        @Override
         @SuppressWarnings("unchecked")
         public HashSet<String> readPayload(DataInput dataIn) throws IOException {
             int dataLen = dataIn.readInt();



Mime
View raw message