ignite-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taras Ledkov <tled...@gridgain.com>
Subject Re: Problem with ReentranLocks on shutdown of one node in cluster
Date Mon, 05 Dec 2016 13:04:11 GMT
Hi,

Thanks a lot for reproducible scenario and for the test.
I create the issue to track:
https://issues.apache.org/jira/browse/IGNITE-4369.

On Wed, Nov 30, 2016 at 7:35 PM, vladiisy <vbaianov@gmx.net> wrote:

> Hi,
>
> i have a problem with my application using reentrant locks, they seems to
> going corrupt after one node leaves my cluster. The threads with locks
> hanging on GridCacheLockImpl.unlock for ever:
>
> ">>JUnit-Test-Worker-0" #160 prio=5 os_prio=0 tid=0x0000000020d57800
> nid=0x21f4 runnable [0x000000002e28e000]
>    java.lang.Thread.State: RUNNABLE
>         at java.lang.Thread.yield(Native Method)
>         at
> org.apache.ignite.internal.processors.datastructures.
> GridCacheLockImpl$Sync.tryRelease(GridCacheLockImpl.java:469)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.release(
> AbstractQueuedSynchronizer.java:1261)
>         at
> org.apache.ignite.internal.processors.datastructures.
> GridCacheLockImpl.unlock(GridCacheLockImpl.java:1296)
>         at
> com.iisy.solvatio.core.cluster.ignite.IgniteTests$
> Worker.run(IgniteTests.java:156)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ....
>
> i have a junit test IgniteTests.java
> <http://apache-ignite-users.70518.x6.nabble.com/file/
> n9303/IgniteTests.java>
> , it shows the problem.
>
> Is it a bug in ignite or my mistake?
>
> Great thanks in advance,
> Vladimir
>
> Here my JUnit
> ----------------------------------------------------
> package ignite.cluster.tests;
>
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Collection;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ThreadFactory;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteLock;
> import org.apache.ignite.IgniteSystemProperties;
> import org.apache.ignite.Ignition;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.events.EventType;
> import org.apache.ignite.logger.slf4j.Slf4jLogger;
> import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
> import
> org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
>
> import com.google.common.util.concurrent.ThreadFactoryBuilder;
>
> public class IgniteTests {
>
>     private Ignite[] clusterNodes;
>
>     @Before
>     public void start() throws Exception {
>         int numNodes = 2;
>         this.clusterNodes = new Ignite[numNodes];
>         for (int i = 0; i < numNodes; i++) {
>             this.clusterNodes[i] = startClusterNode(i);
>         }
>         boolean ready = waitClusterHasNodes(this.clusterNodes[0],
> numNodes);
>         if (!ready) {
>             throw new IllegalStateException("Cluster not ready after 15
> seconds");
>         }
>     }
>
>     public boolean waitClusterHasNodes(final Ignite node, final int
> numNodes) throws InterruptedException {
>         boolean ready = false;
>         for (int i = 0; i < 15; i++) {
>             Collection<ClusterNode> nodes0 = node.cluster().nodes();
>             if (nodes0.size() == numNodes) {
>                 ready = true;
>                 break;
>             }
>             Thread.sleep(1000);
>         }
>         return ready;
>     }
>
>     private Ignite startClusterNode(final int nodeIndex) throws Exception {
>         String nodeName = "node" + nodeIndex;
>         List<String> addresses = Arrays.asList("127.0.0.1:48500..48502");
>
>         System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER,
> Boolean.toString(false));
>         IgniteConfiguration config = new IgniteConfiguration();
>
>         config.setClassLoader(this.getClass().getClassLoader());
>         config.setPeerClassLoadingEnabled(false);
>         config.setGridLogger(new Slf4jLogger());
>
>         config.setGridName(nodeName);
>         config.setConsistentId(nodeName);
>
>         config.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT);
>
>         Map<String, Object> userAttributes = new HashMap<>();
>         userAttributes.put("clusterName", "JUnit-Cluster");
>         config.setUserAttributes(userAttributes);
>
>         config.setMetricsLogFrequency(0);
>
>         TcpDiscoverySpi spi = new TcpDiscoverySpi();
>         TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>         ipFinder.setAddresses(addresses);
>         config.setDiscoverySpi(spi);
>         config.setClientMode(false);
>
>         Ignite ignite = Ignition.start(config);
>         return ignite;
>     }
>
>     @After
>     public void stop() {
>         if (this.clusterNodes != null) {
>             for (int i = 0; i < this.clusterNodes.length; i++) {
>                 shutdown(i);
>             }
>         }
>     }
>
>     /**
>      * @param casesCount
>      * @return
>      */
>     protected ExecutorService createExecutor(final int casesCount) {
>         ThreadFactory factory = new
> ThreadFactoryBuilder().setNameFormat(">>JUnit-Test-Worker-%d").build();
>         return Executors.newFixedThreadPool(casesCount * 2, factory);
>     }
>
>     private void shutdown(final int nodeNum) {
>         Ignite node = this.clusterNodes[nodeNum];
>         if (node != null) {
>             boolean stoped =
> Ignition.stop(this.clusterNodes[nodeNum].name(), false);
>             Assert.assertTrue(stoped);
>
>             this.clusterNodes[nodeNum] = null;
>         }
>     }
>
>     private static class Worker implements Runnable {
>
>         private final int nr;
>         private final Ignite node;
>         private volatile boolean stoped = false;
>         private final CountDownLatch latch;
>
>         public Worker(final int nr, final Ignite node, final CountDownLatch
> latch) {
>             this.nr = nr;
>             this.node = node;
>             this.latch = latch;
>         }
>
>         @Override
>         public void run() {
>             try {
>                 while (!this.stoped) {
>                     IgniteLock lock = this.node.reentrantLock("Lock_" +
> this.nr, true, false, true);
>
>                     lock.lock();
>                     try {
>                         Thread.sleep(2);
>                     } catch (InterruptedException e) {
>                         // ignore
>                     }
>                     lock.unlock();
>                 }
>             } catch (Throwable e) {
>                 System.err.println("Error occured in worker " + this.nr +
> "
> on node " + this.node.name());
>                 e.printStackTrace();
>             }
>             this.latch.countDown();
>         }
>
>         public void stop() {
>             this.stoped = true;
>         }
>     }
>
>     @Test
>     public void testEvents() throws Exception {
>
>         final Ignite node0 = this.clusterNodes[0];
>         final Ignite node1 = this.clusterNodes[1];
>
>         final int workersPerNode = 100;
>         ExecutorService service = createExecutor(workersPerNode * 2);
>         try {
>             CountDownLatch latch0 = new CountDownLatch(workersPerNode);
>             List<Worker> workersNode0 = new ArrayList(workersPerNode);
>
>             CountDownLatch latch1 = new CountDownLatch(workersPerNode);
>             List<Worker> workersNode1 = new ArrayList(workersPerNode);
>             for (int i = 0; i < workersPerNode; i++) {
>                 {
>                     Worker workerNode0 = new Worker(i, node0, latch0);
>                     workersNode0.add(workerNode0);
>                     service.submit(workerNode0);
>                 }
>
>                 {
>                     Worker workerNode1 = new Worker(i, node1, latch1);
>                     workersNode1.add(workerNode1);
>                     service.submit(workerNode1);
>                 }
>             }
>             Thread.sleep(3000);
>             // stop worker on node1
>             workersNode1.forEach(w -> w.stop());
>             boolean downNormally = latch1.await(15, TimeUnit.SECONDS);
>             if (!downNormally) {
>                 Assert.fail("Workers are not ready, missing:" +
> latch1.getCount());
>             }
>             System.out.println("Stopping node 1");
>             shutdown(1);
>             System.out.println("Node 1 is down");
>
>             Thread.sleep(10000);
>             System.out.println("Stopping worker on node 0");
>             workersNode0.forEach(w -> w.stop());
>
>             boolean normally = latch0.await(15, TimeUnit.SECONDS);
>             if (!normally) {
>                 System.err.println("Problem occured, make thead dump...");
>                 Thread.sleep(60 * 1000);
>                 Assert.fail("Workers are not ready, missing: " +
> latch0.getCount());
>             }
>         } finally {
>             service.shutdown();
>         }
>     }
> }
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Problem-with-ReentranLocks-on-shutdown-of-one-node-in-
> cluster-tp9303.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Mime
View raw message