Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2E88200D18 for ; Tue, 26 Sep 2017 20:48:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F18941609C4; Tue, 26 Sep 2017 18:48:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2A537160BDA for ; Tue, 26 Sep 2017 20:48:51 +0200 (CEST) Received: (qmail 88557 invoked by uid 500); 26 Sep 2017 18:48:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88410 invoked by uid 99); 26 Sep 2017 18:48:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Sep 2017 18:48:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42AEBF5B4E; Tue, 26 Sep 2017 18:48:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 26 Sep 2017 18:49:02 -0000 Message-Id: <5c93e77baafa464aacd317b6613e4642@git.apache.org> In-Reply-To: <0ce3c5feb7504a0f83a8c7854ef2924e@git.apache.org> References: <0ce3c5feb7504a0f83a8c7854ef2924e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/17] activemq-artemis git commit: ARTEMIS-1353 Initial replication of large messages out of executor archived-at: Tue, 26 Sep 2017 18:48:53 -0000 ARTEMIS-1353 Initial replication of large messages out of executor This is based on the work @jbertram made at the github pr #1466 and the discussions we had there (cherry picked from commit ce6942a9aa9375efaa449424fe89de2db3f22e36) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/492b55e0 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/492b55e0 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/492b55e0 Branch: refs/heads/1.x Commit: 492b55e09affb03a943c3516a5a3bf513024ca8b Parents: 5db0c87 Author: Clebert Suconic Authored: Fri Aug 18 15:01:33 2017 -0400 Committer: Clebert Suconic Committed: Tue Sep 26 14:28:07 2017 -0400 ---------------------------------------------------------------------- .../artemis/core/protocol/core/Packet.java | 7 ++ .../core/protocol/core/impl/PacketImpl.java | 1 + .../wireformat/ReplicationSyncFileMessage.java | 10 +++ .../core/replication/ReplicationManager.java | 81 ++++++++++++++------ .../impl/SharedNothingLiveActivation.java | 2 +- .../replication/ReplicationTest.java | 2 +- 6 files changed, 77 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java index a86c5c1..efb9aa6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java @@ -93,4 +93,11 @@ public interface Packet { * @return true if confirmation is required */ boolean isRequiresConfirmations(); + + + + /** The packe wasn't used because the stream is closed, + * this gives a chance to sub classes to cleanup anything that won't be used. */ + default void release() { + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 99c052b..afbaf53 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -354,6 +354,7 @@ public class PacketImpl implements Packet { return result; } + @Override public boolean equals(Object obj) { if (this == obj) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 4d3c32f..b81782b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl { if (dataSize > 0) { buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex()); } + + release(); + } + + @Override + public void release() { + if (byteBuffer != null) { + byteBuffer.release(); + byteBuffer = null; + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index e1027d4..d298a24 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; @@ -92,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent { public boolean toBoolean() { return true; } - }, - ADD { + }, ADD { @Override public boolean toBoolean() { return false; @@ -129,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent { private final long timeout; + private final long initialReplicationSyncTimeout; + private volatile boolean inSync = true; private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); @@ -138,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent { */ public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, + final long initialReplicationSyncTimeout, final ExecutorFactory executorFactory) { this.executorFactory = executorFactory; + this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; this.replicationStream = executorFactory.getExecutor(); @@ -178,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent { boolean sync, final boolean lineUp) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true); + sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp); } } @@ -339,10 +343,10 @@ public final class ReplicationManager implements ActiveMQComponent { } private OperationContext sendReplicatePacket(final Packet packet) { - return sendReplicatePacket(packet, true, true); + return sendReplicatePacket(packet, true); } - private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) { + private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { if (!enabled) return null; boolean runItNow = false; @@ -353,22 +357,17 @@ public final class ReplicationManager implements ActiveMQComponent { } if (enabled) { - if (useExecutor) { - replicationStream.execute(() -> { - if (enabled) { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } - }); - } else { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } + replicationStream.execute(() -> { + if (enabled) { + pendingTokens.add(repliToken); + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } + }); } else { // Already replicating channel failed, so just play the action now runItNow = true; + packet.release(); } // Execute outside lock @@ -396,7 +395,6 @@ public final class ReplicationManager implements ActiveMQComponent { } } - return flowWorked; } @@ -511,6 +509,24 @@ public final class ReplicationManager implements ActiveMQComponent { sendLargeFile(null, queueName, id, file, Long.MAX_VALUE); } + private class FlushAction implements Runnable { + + ReusableLatch latch = new ReusableLatch(1); + + public void reset() { + latch.setCount(1); + } + + public boolean await(long timeout, TimeUnit unit) throws Exception { + return latch.await(timeout, unit); + } + + @Override + public void run() { + latch.countDown(); + } + } + /** * Sends large files in reasonably sized chunks to the backup during replication synchronization. * @@ -532,15 +548,19 @@ public final class ReplicationManager implements ActiveMQComponent { file.open(); } int size = 32 * 1024; - final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); + + int flowControlSize = 10; + + int packetsSent = 0; + FlushAction action = new FlushAction(); try { - try (final FileInputStream fis = new FileInputStream(file.getJavaFile()); - final FileChannel channel = fis.getChannel()) { + try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) { // We can afford having a single buffer here for this entire loop // because sendReplicatePacket will encode the packet as a NettyBuffer // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy while (true) { + final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size); buffer.clear(); ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer(); final int bytesRead = channel.read(byteBuffer); @@ -558,18 +578,31 @@ public final class ReplicationManager implements ActiveMQComponent { // We cannot simply send everything of a file through the executor, // otherwise we would run out of memory. // so we don't use the executor here - sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false); + sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true); + packetsSent++; + + if (packetsSent % flowControlSize == 0) { + flushReplicationStream(action); + } if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0) break; } } + flushReplicationStream(action); } finally { - buffer.release(); if (file.isOpen()) file.close(); } } + private void flushReplicationStream(FlushAction action) throws Exception { + action.reset(); + replicationStream.execute(action); + if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) { + throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + } + } + /** * Reserve the following fileIDs in the backup server. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index c984ae2..b532e57 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 398e895..46cb085 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); + manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected");