From dev-return-63515-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Thu Jan 18 18:45:23 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 03D39180654 for ; Thu, 18 Jan 2018 18:45:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E7DC3160C2B; Thu, 18 Jan 2018 17:45:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38D05160C26 for ; Thu, 18 Jan 2018 18:45:22 +0100 (CET) Received: (qmail 24665 invoked by uid 500); 18 Jan 2018 17:45:21 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 24647 invoked by uid 99); 18 Jan 2018 17:45:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jan 2018 17:45:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5FB5DFBAA; Thu, 18 Jan 2018 17:45:20 +0000 (UTC) From: clebertsuconic To: dev@activemq.apache.org Reply-To: dev@activemq.apache.org References: In-Reply-To: Subject: [GitHub] activemq-artemis pull request #1742: ARTEMIS-1570 Flush appendExecutor befor... Content-Type: text/plain Message-Id: <20180118174520.E5FB5DFBAA@git1-us-west.apache.org> Date: Thu, 18 Jan 2018 17:45:20 +0000 (UTC) Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/1742#discussion_r162418130 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java --- @@ -0,0 +1,388 @@ +package org.apache.activemq.artemis.tests.integration.replication; + + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.*; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.junit.Wait; +import org.jboss.logging.Logger; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class SharedNothingReplicationTest { + private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class); + + @Rule + public TemporaryFolder brokersFolder = new TemporaryFolder(); + + private SlowMessagePersister slowMessagePersister; + + @Before + public void setUp() throws Exception { + slowMessagePersister = new SlowMessagePersister(); + CoreMessagePersister.theInstance = slowMessagePersister; + } + + @After + public void tearDown() throws Exception { + if (slowMessagePersister != null) { + CoreMessagePersister.theInstance = slowMessagePersister.persister; + } + } + + @Test + public void testReplicateFromSlowLive() throws Exception { + // start live + Configuration liveConfiguration = createLiveConfiguration(); + ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration); + liveServer.start(); + + Wait.waitFor(() -> liveServer.isStarted()); + + CoreMessagePersister.theInstance = SlowMessagePersister._getInstance(); + + final CountDownLatch replicated = new CountDownLatch(1); + + ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616"); + locator.setCallTimeout(60_000L); + locator.setConnectionTTL(60_000L); + locator.addClusterTopologyListener(new ClusterTopologyListener() { + @Override + public void nodeUP(TopologyMember member, boolean last) { + logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(), member.getBackup()); + if (member.getBackup() != null) { + replicated.countDown(); + } + } + + @Override + public void nodeDown(long eventUID, String nodeID) { + + } + }); + + final ClientSessionFactory csf = locator.createSessionFactory(); + ClientSession sess = csf.createSession(); + sess.createQueue("slow", RoutingType.ANYCAST, "slow", true); + sess.close(); + Executor sendMessageExecutor = Executors.newCachedThreadPool(); + + + // let's write some messages + int i = 0; + final int j = 50; + final CountDownLatch allMessageSent = new CountDownLatch(j); + while (i < 5) { + sendMessageExecutor.execute(() -> { + try { + ClientSession session = csf.createSession(true, true); + ClientProducer producer = session.createProducer("slow"); + ClientMessage message = session.createMessage(true); + // this will make journal's append executor busy + message.putLongProperty("delay", Long.getLong("message.property.delay",500L)); --- End diff -- Where? how this property is affecting the semantic of the server. I didn't find it anywhere. ---