activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "leisore" <leis...@foxmail.com>
Subject TopicStorePrefetch do not keep sync with store
Date Mon, 12 Jan 2015 14:39:22 GMT
Hi
     
     I have a question on class TopicStorePrefetch and     KahaDBStore.KahaDBTopicMessageStore,
details as following:
     
     When persistent messages are added into cursor, if there is no     memory space, cursor
cache is disabled. 
     To keep sync with store, QueueStorePrefetch do itby calling method     [KahaDBMessageStore.setBatch],
but TopicStorePrefetch do nothing.
     It may lead to duplicated messages recovered from store.
     
     I think TopicStorePrefetch should do the similar logic with     QueueStorePrefetch, just
like this:
     
     KahaDBStore.KahaDBTopicMessageStore.setBatch:
     ===============================================
     public void setBatch(final String clientId, final String     subscriptionName, final
MessageId identity, final int priority)     throws IOException {
         try {
            
             lockAsyncJobQueue();
     
             indexLock.writeLock().lock();
             try {
                 pageFile.tx().execute(new     Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws     IOException {
                         StoredDestination sd =     getStoredDestination(dest, tx);
                         String key = identity.toString();
                         String subscriptionKey =     subscriptionKey(clientId, subscriptionName);
                         Long location = sd.messageIdIndex.get(tx, key);
                         if (location == null) {
                             return;
                         }
                         
                         MessageOrderCursor moc =     sd.subscriptionCursors.get(subscriptionKey);
                         if (moc == null) {
                             moc = new MessageOrderCursor();
                             sd.subscriptionCursors.put(subscriptionKey,     moc);
                         }
                         
                         if (priority == MessageOrderIndex.DEF) {
                             moc.defaultCursorPosition =     location.longValue();
                         } else if (priority > MessageOrderIndex.HI) {
                             moc.highPriorityCursorPosition =     location.longValue();
                         } else {
                             moc.lowPriorityCursorPosition =     location.longValue();
                         }
                     }
                 });
             } finally {
                 indexLock.writeLock().unlock();
             }
         } finally {
             unlockAsyncJobQueue();
         }
     }
     ===============================================
     
     But i'm not sure it's right enough, please confirm the question.
     
     Thanks a lot.
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message