activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ARTEMIS-1570) SharedNothingBackup does not replicate all journal from live
Date Thu, 18 Jan 2018 17:46:00 GMT

    [ https://issues.apache.org/jira/browse/ARTEMIS-1570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330839#comment-16330839
] 

ASF GitHub Bot commented on ARTEMIS-1570:
-----------------------------------------

Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1742#discussion_r162418130
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java
---
    @@ -0,0 +1,388 @@
    +package org.apache.activemq.artemis.tests.integration.replication;
    +
    +
    +import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
    +import org.apache.activemq.artemis.api.core.ActiveMQException;
    +import org.apache.activemq.artemis.api.core.Message;
    +import org.apache.activemq.artemis.api.core.RoutingType;
    +import org.apache.activemq.artemis.api.core.client.*;
    +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
    +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
    +import org.apache.activemq.artemis.core.config.Configuration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
    +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
    +import org.apache.activemq.artemis.core.io.SequentialFileFactory;
    +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
    +import org.apache.activemq.artemis.core.journal.LoaderCallback;
    +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
    +import org.apache.activemq.artemis.core.journal.RecordInfo;
    +import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessage;
    +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
    +import org.apache.activemq.artemis.core.persistence.Persister;
    +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
    +import org.apache.activemq.artemis.core.server.ActiveMQServer;
    +import org.apache.activemq.artemis.core.server.ActiveMQServers;
    +import org.apache.activemq.artemis.core.server.JournalType;
    +import org.apache.activemq.artemis.junit.Wait;
    +import org.jboss.logging.Logger;
    +import org.junit.*;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +public class SharedNothingReplicationTest {
    +    private static final Logger logger = Logger.getLogger(SharedNothingReplicationTest.class);
    +
    +    @Rule
    +    public TemporaryFolder brokersFolder = new TemporaryFolder();
    +
    +    private SlowMessagePersister slowMessagePersister;
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        slowMessagePersister = new SlowMessagePersister();
    +        CoreMessagePersister.theInstance = slowMessagePersister;
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        if (slowMessagePersister != null) {
    +            CoreMessagePersister.theInstance = slowMessagePersister.persister;
    +        }
    +    }
    +
    +    @Test
    +    public void testReplicateFromSlowLive() throws Exception {
    +        // start live
    +        Configuration liveConfiguration = createLiveConfiguration();
    +        ActiveMQServer liveServer = ActiveMQServers.newActiveMQServer(liveConfiguration);
    +        liveServer.start();
    +
    +        Wait.waitFor(() -> liveServer.isStarted());
    +
    +        CoreMessagePersister.theInstance = SlowMessagePersister._getInstance();
    +
    +        final CountDownLatch replicated = new CountDownLatch(1);
    +
    +        ServerLocator locator = ServerLocatorImpl.newLocator("tcp://localhost:61616");
    +        locator.setCallTimeout(60_000L);
    +        locator.setConnectionTTL(60_000L);
    +        locator.addClusterTopologyListener(new ClusterTopologyListener() {
    +            @Override
    +            public void nodeUP(TopologyMember member, boolean last) {
    +                logger.infof("nodeUP fired last=%s, live=%s, backup=%s", last, member.getLive(),
member.getBackup());
    +                if (member.getBackup() != null) {
    +                    replicated.countDown();
    +                }
    +            }
    +
    +            @Override
    +            public void nodeDown(long eventUID, String nodeID) {
    +
    +            }
    +        });
    +
    +        final ClientSessionFactory csf = locator.createSessionFactory();
    +        ClientSession sess = csf.createSession();
    +        sess.createQueue("slow", RoutingType.ANYCAST, "slow", true);
    +        sess.close();
    +        Executor sendMessageExecutor = Executors.newCachedThreadPool();
    +
    +
    +        // let's write some messages
    +        int i = 0;
    +        final int j = 50;
    +        final CountDownLatch allMessageSent = new CountDownLatch(j);
    +        while (i < 5) {
    +            sendMessageExecutor.execute(() -> {
    +                try {
    +                    ClientSession session = csf.createSession(true, true);
    +                    ClientProducer producer = session.createProducer("slow");
    +                    ClientMessage message = session.createMessage(true);
    +                    // this will make journal's append executor busy
    +                    message.putLongProperty("delay", Long.getLong("message.property.delay",500L));
    --- End diff --
    
    Where? how this property is affecting the semantic of the server. I didn't find it anywhere.


> SharedNothingBackup does not replicate all journal from live
> ------------------------------------------------------------
>
>                 Key: ARTEMIS-1570
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1570
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 2.4.0
>         Environment: i'm running unit test on windows.
>            Reporter: shoukun huai
>            Priority: Critical
>         Attachments: SharedNothingReplicationTest.java
>
>
> I try to test replication when live is in heavy IO load.
> Attached is my junit test.
> The test use a slow message persister to simulate live is busy on IO, so JournalImpl's
`appendExecutor` is busy.
> After start live server, send 5 messages each with a property `delay` of 5000 ms, then
start the backup server, wait until it is replicated. Then send more messages without delay.
> Stop live and backup after all message sent, then check message journal.
> Backup will miss 2 message/journal entry.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message