activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-1089 Fixing Replication catchup slow
Date Mon, 03 Apr 2017 11:54:52 GMT
ARTEMIS-1089 Fixing Replication catchup slow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0b62f698
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0b62f698
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0b62f698

Branch: refs/heads/master
Commit: 0b62f698c3aaff5c109e625185332d2506afdaf4
Parents: 739dd82
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Sun Apr 2 19:20:42 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Sun Apr 2 20:20:45 2017 -0400

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  1 +
 .../core/paging/impl/PagingStoreImpl.java       | 31 ++++---
 .../impl/journal/JournalStorageManager.java     |  2 +-
 .../wireformat/ReplicationSyncFileMessage.java  | 13 ++-
 .../core/replication/ReplicationEndpoint.java   |  2 +-
 .../core/replication/ReplicationManager.java    | 97 ++++++++++++--------
 .../core/server/ActiveMQServerLogger.java       |  5 +-
 7 files changed, 87 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 76ad26b..701f86c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    @Override
    public void resumeCleanup() {
       this.cleanupEnabled = true;
+      scheduleCleanup();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index a8e2190..03f53c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1123,28 +1123,29 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public Collection<Integer> getCurrentIds() throws Exception {
-      List<Integer> ids = new ArrayList<>();
-      if (fileFactory != null) {
-         for (String fileName : fileFactory.listFiles("page")) {
-            ids.add(getPageIdFromFileName(fileName));
+      lock.writeLock().lock();
+      try {
+         List<Integer> ids = new ArrayList<>();
+         if (fileFactory != null) {
+            for (String fileName : fileFactory.listFiles("page")) {
+               ids.add(getPageIdFromFileName(fileName));
+            }
          }
+         return ids;
+      } finally {
+         lock.writeLock().unlock();
       }
-      return ids;
    }
 
    @Override
    public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds)
throws Exception {
-      lock.writeLock().lock();
-      try {
-         for (Integer id : pageIds) {
-            SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
-            if (!sFile.exists()) {
-               continue;
-            }
-            replicator.syncPages(sFile, id, getAddress());
+      for (Integer id : pageIds) {
+         SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
+         if (!sFile.exists()) {
+            continue;
          }
-      } finally {
-         lock.writeLock().unlock();
+         ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
+         replicator.syncPages(sFile, id, getAddress());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index c31de52..c8dbd44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          stopReplication();
          throw e;
       } finally {
-         pagingManager.resumeCleanup();
          // Re-enable compact and reclaim of journal files
          originalBindingsJournal.replicationSyncFinished();
          originalMessageJournal.replicationSyncFinished();
+         pagingManager.resumeCleanup();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index de7f73e..90d2ca0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
@@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
     */
    private long fileId;
    private int dataSize;
-   private ByteBuffer byteBuffer;
+   private ByteBuf byteBuffer;
    private byte[] byteArray;
    private SimpleString pageStoreName;
    private FileType fileType;
@@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
                                      SimpleString storeName,
                                      long id,
                                      int size,
-                                     ByteBuffer buffer) {
+                                     ByteBuf buffer) {
       this();
       this.byteBuffer = buffer;
       this.pageStoreName = storeName;
@@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
        * (which might receive appends)
        */
       if (dataSize > 0) {
-         buffer.writeBytes(byteBuffer);
+         buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+      }
+
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index e82d38e..4bf2726 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -411,7 +411,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       if (!channel1.isOpen()) {
          channel1.open();
       }
-      channel1.writeDirect(ByteBuffer.wrap(data), true);
+      channel1.writeDirect(ByteBuffer.wrap(data), false);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index dce5990..8b91c02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -25,8 +25,11 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -122,6 +125,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    private final ExecutorFactory executorFactory;
 
+   private final Executor replicationStream;
+
    private SessionFailureListener failureListener;
 
    private CoreRemotingConnection remotingConnection;
@@ -141,6 +146,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       this.executorFactory = executorFactory;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
       this.remotingConnection = remotingConnection;
+      this.replicationStream = executorFactory.getExecutor();
       this.timeout = timeout;
    }
 
@@ -178,7 +184,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp,
true);
       }
    }
 
@@ -343,15 +349,15 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
    }
 
    private OperationContext sendReplicatePacket(final Packet packet) {
-      return sendReplicatePacket(packet, true);
+      return sendReplicatePacket(packet, true, true);
    }
 
-   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean
useExecutor) {
       if (!enabled)
          return null;
       boolean runItNow = false;
 
-      OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
+      final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
       if (lineUp) {
          repliToken.replicationLineUp();
       }
@@ -359,10 +365,17 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
       synchronized (replicationLock) {
          if (enabled) {
             pendingTokens.add(repliToken);
-            if (!flowControl()) {
-               return repliToken;
+            if (useExecutor) {
+               replicationStream.execute(() -> {
+                  if (enabled) {
+                     flowControl();
+                     replicatingChannel.send(packet);
+                  }
+               });
+            } else {
+               flowControl();
+               replicatingChannel.send(packet);
             }
-            replicatingChannel.send(packet);
          } else {
             // Already replicating channel failed, so just play the action now
             runItNow = true;
@@ -383,33 +396,35 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
     * In case you refactor this in any way, this method must hold a lock on replication lock.
.
     */
    private boolean flowControl() {
-      // synchronized (replicationLock) { -- I'm not adding this because the caller already
has it
-      // future maintainers of this code please be aware that the intention here is hold
the lock on replication lock
-      if (!replicatingChannel.getConnection().isWritable(this)) {
-         try {
-            logger.trace("flowControl waiting on writable");
-            writable.set(false);
-            //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
-            long now = System.currentTimeMillis();
-            long deadline = now + timeout;
-            while (!writable.get() && now < deadline) {
-               replicationLock.wait(deadline - now);
-               now = System.currentTimeMillis();
-            }
-            logger.trace("flow control done");
-
-            if (!writable.get()) {
-               ActiveMQServerLogger.LOGGER.slowReplicationResponse();
-               logger.tracef("There was no response from replication backup after %s seconds,
server being stopped now", System.currentTimeMillis() - now);
-               try {
-                  stop();
-               } catch (Exception e) {
-                  logger.warn(e.getMessage(), e);
+      synchronized (replicationLock) {
+         // synchronized (replicationLock) { -- I'm not adding this because the caller already
has it
+         // future maintainers of this code please be aware that the intention here is hold
the lock on replication lock
+         if (!replicatingChannel.getConnection().isWritable(this)) {
+            try {
+               logger.trace("flowControl waiting on writable replication");
+               writable.set(false);
+               //don't wait for ever as this may hang tests etc, we've probably been closed
anyway
+               long now = System.currentTimeMillis();
+               long deadline = now + timeout;
+               while (!writable.get() && now < deadline) {
+                  replicationLock.wait(deadline - now);
+                  now = System.currentTimeMillis();
+               }
+               logger.trace("flow control done on replication");
+
+               if (!writable.get()) {
+                  ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+                  logger.tracef("There was no response from replication backup after %s seconds,
server being stopped now", System.currentTimeMillis() - now);
+                  try {
+                     stop();
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+                  return false;
                }
-               return false;
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
             }
-         } catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
          }
       }
       return true;
@@ -515,7 +530,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       }
       SequentialFile file = jf.getFile().cloneFile();
       try {
-         ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file);
+         ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
          sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
       } finally {
          if (file.isOpen())
@@ -560,10 +575,11 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the
next copy
-            final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17
== 131072 == 128 * 1024
+            int size = 1 << 17;
             while (true) {
-               buffer.clear();
-               final int bytesRead = channel.read(buffer);
+               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
+               ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
+               final int bytesRead = channel.read(byteBuffer);
                int toSend = bytesRead;
                if (bytesRead > 0) {
                   if (bytesRead >= maxBytesToSend) {
@@ -572,12 +588,13 @@ public final class ReplicationManager implements ActiveMQComponent,
ReadyListene
                   } else {
                      maxBytesToSend = maxBytesToSend - bytesRead;
                   }
-                  buffer.limit(toSend);
                }
-               buffer.rewind();
-
+               logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
                // sending -1 or 0 bytes will close the file at the backup
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
toSend, buffer));
+               // We cannot simply send everything of a file through the executor,
+               // otherwise we would run out of memory.
+               // so we don't use the executor here
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
toSend, buffer), true, false);
                if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                   break;
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b62f698/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index d89c356..56a4d32 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void backupServerSynched(ActiveMQServerImpl server);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format
= Message.Format.MESSAGE_FORMAT)
-   void journalSynch(JournalFile jf, Long size, SequentialFile file);
+   @Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format
= Message.Format.MESSAGE_FORMAT)
+   void replicaSyncFile(SequentialFile jf, Long size);
 
    @LogMessage(level = Logger.Level.INFO)
    @Message(


Mime
View raw message