activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelandrepearce <...@git.apache.org>
Subject [GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Date Wed, 09 Jan 2019 22:52:44 GMT
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
---
    @@ -48,54 +82,228 @@ public long getPageId() {
        }
     
        @Override
    -   public synchronized int getNumberOfMessages() {
    -      return messages.size();
    +   public int getNumberOfMessages() {
    +      while (true) {
    +         final long size = producerIndex;
    +         if (size == RESIZING) {
    +            Thread.yield();
    +            continue;
    +         }
    +         return (int) Math.min(size, Integer.MAX_VALUE);
    +      }
        }
     
        @Override
    -   public synchronized void setMessages(PagedMessage[] messages) {
    +   public void setMessages(PagedMessage[] messages) {
           // This method shouldn't be called on liveCache, but we will provide the implementation
for it anyway
           for (PagedMessage msg : messages) {
              addLiveMessage(msg);
           }
        }
     
        @Override
    -   public synchronized PagedMessage getMessage(int messageNumber) {
    -      if (messageNumber < messages.size()) {
    -         return messages.get(messageNumber);
    -      } else {
    +   public PagedMessage getMessage(int messageNumber) {
    +      if (messageNumber < 0) {
              return null;
           }
    +      //it allow to perform less cache invalidations vs producerIndex if there are bursts
of appends
    +      long size = lastSeenProducerIndex;
    +      if (messageNumber >= size) {
    +         while ((size = producerIndex) == RESIZING) {
    +            Thread.yield();
    +         }
    +         //it is a message over the current size?
    +         if (messageNumber >= size) {
    +            return null;
    +         }
    +         //publish it for others consumers
    +         LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
    +      }
    +      final AtomicChunk<PagedMessage> buffer;
    +      final int offset;
    +      if (messageNumber >= chunkSize) {
    +         offset = messageNumber & chunkMask;
    +         //slow path is moved in a separate method
    +         buffer = jump(messageNumber, size);
    +      } else {
    +         offset = messageNumber;
    +         buffer = consumerBuffer;
    +      }
    +      //NOTE: producerIndex is being updated before setting a new value ie on consumer
side need to spin until a not null value is set
    +      PagedMessage msg;
    +      while ((msg = buffer.get(offset)) == null) {
    +         Thread.yield();
    +      }
    +      return msg;
    +   }
    +
    +   /**
    +    * Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)}
to speed up queries
    +    * ie backward search of a node if needed.
    +    */
    +   private AtomicChunk<PagedMessage> jump(final int messageNumber, final long size)
{
    +      //fast division by a power of 2
    +      final int jumps = messageNumber >> chunkSizeLog2;
    --- End diff --
    
    cough ;) ... for the same comment you left me .... :P


---

Mime
View raw message