ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "juanma.cvega" <juanma.cv...@gmail.com>
Subject Clearing a distributed queue hangs after taking down one node
Date Fri, 26 Aug 2016 15:47:22 GMT
Hi,

I have come across a situation where a distributed queue hangs. My use case
requires a shared queue that is populated only from the oldest node in a
cluster. Then, the queue is consumed from the same node and the other nodes
in the cluster (in my case only another one). Every time there is a change
in the topology, the queue has to be cleared and repopulated again. 
I wrote a simple test case to replicate the issue. In this test, starting
two nodes work fine. The second one makes the first one clear the queue and
start populating it again. The problem comes when any node is stopped after
that. The code hangs when trying to clear the queue. 
Another problem I have is with the fairness of the queue consumption. The
idea is to have nodes to shard the data by consuming from the queue but
there is always one node taking almost all the data from it. In my case
example, this can be reproduced by using the same sleep time in both the
producer and the consumer.

This is the code I used. Any idea?

Thanks. 

import com.google.common.collect.Lists;
import org.apache.ignite.*;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IgniteTest {

   public static void main(String... args) throws InterruptedException {
      Ignite ignite = getIgnite();

      IgniteQueue queue = getQueue(ignite);

      IgniteCluster cluster = ignite.cluster();

      IgniteEvents events = ignite.events();

      IgniteAtomicLong counter = ignite.atomicLong("counter", 0, true);

      ExecutorService consumerThread = Executors.newSingleThreadExecutor();
      consumerThread.execute(() -> {
         while (true) {
            System.out.println("Number taken: " + queue.take());
            try {
               Thread.sleep(3500);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      });
      ExecutorService producerThread = Executors.newSingleThreadExecutor();
      producerThread.execute(() ->
            events.localListen(event -> {
               if (cluster.forOldest().node().isLocal()) {
                  System.out.println("Clearing...");
                  queue.clear();
                  System.out.println("I'm oldest!!");
               }
               return true;
            }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_JOINED,
EventType.EVT_NODE_LEFT));
      Runtime.getRuntime().addShutdownHook(new
Thread(executorService1::shutdownNow));
      Runtime.getRuntime().addShutdownHook(new
Thread(executorService2::shutdownNow));
      while (true) {
         if (cluster.forOldest().node().isLocal()) {
            long current = counter.getAndIncrement();
            System.out.println("Number put: " + current);
            queue.put(current);
            Thread.sleep(1000);
         }
      }

   }

   private static IgniteQueue getQueue(Ignite ignite) {
      CollectionConfiguration cfg = new CollectionConfiguration();
      cfg.setBackups(1);
      cfg.setCollocated(false);
      return ignite.queue("queue", 0, cfg);
   }

   private static Ignite getIgnite() {
      IgniteConfiguration configuration = new IgniteConfiguration();
      TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi();
      communicationSpi.setLocalPort(47200);
      communicationSpi.setLocalPortRange(2);
      TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new
TcpDiscoveryVmIpFinder();
     
tcpDiscoveryVmIpFinder.setAddresses(Lists.newArrayList("localhost:47100..47101"));
      TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
      discoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
      discoverySpi.setLocalPort(47100);
      discoverySpi.setLocalPortRange(2);
      configuration.setDiscoverySpi(discoverySpi);
      configuration.setCommunicationSpi(communicationSpi);
      configuration.setGridName("PRICE_CHANGE_ALERTS_GRID");
      configuration.setIncludeEventTypes(EventType.EVT_NODE_FAILED,
EventType.EVT_NODE_JOINED, EventType.EVT_NODE_LEFT);
      AtomicConfiguration atomicConfiguration = new AtomicConfiguration();
      atomicConfiguration.setCacheMode(CacheMode.REPLICATED);
      configuration.setAtomicConfiguration(atomicConfiguration);
      return Ignition.start(configuration);
   }
}




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Clearing-a-distributed-queue-hangs-after-taking-down-one-node-tp7353.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Mime
View raw message