ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [16/22] ignite git commit: IGNITE-642 Implement IgniteReentrantLock data structure
Date Thu, 28 Apr 2016 11:29:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
new file mode 100644
index 0000000..8fb9049
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteLockAbstractSelfTest.java
@@ -0,0 +1,1629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ * Cache reentrant lock self test.
+ */
+public abstract class IgniteLockAbstractSelfTest extends IgniteAtomicsAbstractTest
+    implements Externalizable {
+    /** */
+    private static final int NODES_CNT = 4;
+
+    /** */
+    protected static final int THREADS_CNT = 5;
+
+    /** */
+    private static final Random RND = new Random();
+
+    /** */
+    @Rule
+    public final ExpectedException exception = ExpectedException.none();
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODES_CNT;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLock() throws Exception {
+        checkReentrantLock(false);
+
+        checkReentrantLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailover() throws Exception {
+        if (atomicsCacheMode() == LOCAL)
+            return;
+
+        checkFailover(true, false);
+
+        checkFailover(false, false);
+
+        checkFailover(true, true);
+
+        checkFailover(false, true);
+    }
+
+    /**
+     * @param failoverSafe Failover safe flag.
+     * @throws Exception
+     */
+    private void checkFailover(final boolean failoverSafe, final boolean fair) throws Exception {
+        IgniteEx g = startGrid(NODES_CNT + 1);
+
+        // For vars locality.
+        {
+            // Ensure not exists.
+            assert g.reentrantLock("lock", failoverSafe, fair, false) == null;
+
+            IgniteLock lock  = g.reentrantLock("lock", failoverSafe, fair, true);
+
+            lock.lock();
+
+            assert lock.tryLock();
+
+            assertEquals(2, lock.getHoldCount());
+        }
+
+        Ignite g0 = grid(0);
+
+        final IgniteLock lock0 = g0.reentrantLock("lock", false, fair, false);
+
+        assert !lock0.tryLock();
+
+        assertEquals(0, lock0.getHoldCount());
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try {
+                        lock0.lock();
+
+                        info("Acquired in separate thread.");
+
+                        // Lock is acquired silently only in failoverSafe mode.
+                        assertTrue(failoverSafe);
+
+                        lock0.unlock();
+
+                        info("Released lock in separate thread.");
+                    }
+                    catch (IgniteException e) {
+                        if (!failoverSafe)
+                            info("Ignored expected exception: " + e);
+                        else
+                            throw e;
+                    }
+                    return null;
+                }
+            },
+            1);
+
+        Thread.sleep(100);
+
+        g.close();
+
+        fut.get(500);
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkReentrantLock(final boolean fair) throws Exception {
+        // Test API.
+        checkLock(fair);
+
+        checkFailoverSafe(fair);
+
+        // Test main functionality.
+        IgniteLock lock1 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertFalse(lock1.isLocked());
+
+        lock1.lock();
+
+        IgniteCompute comp = grid(0).compute().withAsync();
+
+        comp.call(new IgniteCallable<Object>() {
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            @LoggerResource
+            private IgniteLogger log;
+
+            @Nullable @Override public Object call() throws Exception {
+                // Test reentrant lock in multiple threads on each node.
+                IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+                    new Callable<Object>() {
+                        @Nullable @Override public Object call() throws Exception {
+                            IgniteLock lock = ignite.reentrantLock("lock", true, fair, true);
+
+                            assert lock != null;
+
+                            log.info("Thread is going to wait on reentrant lock: " + Thread.currentThread().getName());
+
+                            assert lock.tryLock(1, MINUTES);
+
+                            log.info("Thread is again runnable: " + Thread.currentThread().getName());
+
+                            lock.unlock();
+
+                            return null;
+                        }
+                    },
+                    5,
+                    "test-thread"
+                );
+
+                fut.get();
+
+                return null;
+            }
+        });
+
+        IgniteFuture<Object> fut = comp.future();
+
+        Thread.sleep(3000);
+
+        assert lock1.isHeldByCurrentThread();
+
+        assert lock1.getHoldCount() == 1;
+
+        lock1.lock();
+
+        assert lock1.isHeldByCurrentThread();
+
+        assert lock1.getHoldCount() == 2;
+
+        lock1.unlock();
+
+        lock1.unlock();
+
+        // Ensure there are no hangs.
+        fut.get();
+
+        // Test operations on removed lock.
+        lock1.close();
+
+        checkRemovedReentrantLock(lock1);
+    }
+
+    /**
+     * @param lock IgniteLock.
+     * @throws Exception If failed.
+     */
+    protected void checkRemovedReentrantLock(final IgniteLock lock) throws Exception {
+        assert GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return lock.removed();
+            }
+        }, 5000);
+
+        assert lock.removed();
+    }
+
+    /**
+     * This method only checks if parameter of new reentrant lock is initialized properly.
+     * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest.
+     *
+     * @throws Exception Exception.
+     */
+    private void checkFailoverSafe(final boolean fair) throws Exception {
+        // Checks only if reentrant lock is initialized properly
+        IgniteLock lock = createReentrantLock("rmv", true, fair);
+
+        assert lock.isFailoverSafe();
+
+        removeReentrantLock("rmv", fair);
+
+        IgniteLock lock1 = createReentrantLock("rmv1", false, fair);
+
+        assert !lock1.isFailoverSafe();
+
+        removeReentrantLock("rmv1", fair);
+    }
+
+    /**
+     * @throws Exception Exception.
+     */
+    private void checkLock(final boolean fair) throws Exception {
+        // Check only 'false' cases here. Successful lock is tested over the grid.
+        final IgniteLock lock = createReentrantLock("acquire", false, fair);
+
+        lock.lock();
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                assertNotNull(lock);
+
+                assert !lock.tryLock();
+
+                assert !lock.tryLock(10, MICROSECONDS);
+
+                return null;
+            }
+        });
+
+        fut.get();
+
+        lock.unlock();
+
+        removeReentrantLock("acquire", fair);
+    }
+
+    /**
+     * @param lockName Reentrant lock name.
+     * @param failoverSafe FailoverSafe flag.
+     * @param fair Fairness flag.
+     * @return New distributed reentrant lock.
+     * @throws Exception If failed.
+     */
+    private IgniteLock createReentrantLock(String lockName, boolean failoverSafe, boolean fair)
+        throws Exception {
+        IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, failoverSafe, fair, true);
+
+        // Test initialization.
+        assert lockName.equals(lock.name());
+        assert !lock.isLocked();
+        assert lock.isFailoverSafe() == failoverSafe;
+        assert lock.isFair() == fair;
+
+        return lock;
+    }
+
+    /**
+     * @param lockName Reentrant lock name.
+     * @throws Exception If failed.
+     */
+    private void removeReentrantLock(String lockName, final boolean fair)
+        throws Exception {
+        IgniteLock lock = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true);
+
+        assert lock != null;
+
+        // Remove lock on random node.
+        IgniteLock lock0 = grid(RND.nextInt(NODES_CNT)).reentrantLock(lockName, false, fair, true);
+
+        assertNotNull(lock0);
+
+        lock0.close();
+
+        // Ensure reentrant lock is removed on all nodes.
+        for (Ignite g : G.allGrids())
+            assertNull(((IgniteKernal)g).context().dataStructures().reentrantLock(lockName, false, fair, false));
+
+        checkRemovedReentrantLock(lock);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockSerialization() throws Exception {
+        final IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+        info("Lock created: " + lock);
+
+        lock.isFailoverSafe();
+        lock.isFair();
+
+        grid(ThreadLocalRandom.current().nextInt(G.allGrids().size())).compute().broadcast(new IgniteCallable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                Thread.sleep(1000);
+
+                lock.lock();
+
+                try {
+                    info("Inside lock: " + lock.getHoldCount());
+                }
+                finally {
+                    lock.unlock();
+                }
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInitialization() throws Exception {
+        // Test #name() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertEquals("lock", lock.name());
+
+            lock.close();
+        }
+
+        // Test #isFailoverSafe() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            info("Lock created: " + lock);
+
+            assertTrue(lock.isFailoverSafe());
+
+            lock.close();
+        }
+
+        // Test #isFair() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertTrue(lock.isFair());
+
+            lock.close();
+        }
+
+        // Test #isBroken() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.isBroken());
+
+            lock.close();
+        }
+
+        // Test #getOrCreateCondition(String ) method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertNotNull(lock.getOrCreateCondition("condition"));
+
+            lock.close();
+        }
+
+        // Test #getHoldCount() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertEquals(0, lock.getHoldCount());
+
+            lock.close();
+        }
+
+        // Test #isHeldByCurrentThread() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.isHeldByCurrentThread());
+
+            lock.close();
+        }
+
+        // Test #isLocked() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.isLocked());
+
+            lock.close();
+        }
+
+        // Test #hasQueuedThreads() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.hasQueuedThreads());
+
+            lock.close();
+        }
+
+        // Test #hasQueuedThread(Thread ) method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.hasQueuedThread(Thread.currentThread()));
+
+            lock.close();
+        }
+
+        // Test #hasWaiters(IgniteCondition ) method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            try {
+                IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond");
+
+                lock.hasWaiters(cond);
+
+                fail("Condition not associated with this lock passed as argument.");
+            }
+            catch (IllegalArgumentException e) {
+                info("IllegalArgumentException thrown as it should be.");
+            }
+
+            try {
+                IgniteCondition cond = lock.getOrCreateCondition("condition");
+
+                lock.hasWaiters(cond);
+
+                fail("This method should throw exception when lock is not held.");
+            }
+            catch (IllegalMonitorStateException e) {
+                info("IllegalMonitorStateException thrown as it should be.");
+            }
+
+            lock.close();
+        }
+
+        // Test #getWaitQueueLength(IgniteCondition ) method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            try {
+                IgniteCondition cond = grid(0).reentrantLock("lock2", true, true, true).getOrCreateCondition("cond");
+
+                lock.getWaitQueueLength(cond);
+
+                fail("Condition not associated with this lock passed as argument.");
+            }
+            catch (IllegalArgumentException e) {
+                info("IllegalArgumentException thrown as it should be.");
+            }
+
+            try {
+                IgniteCondition cond = lock.getOrCreateCondition("condition");
+
+                lock.getWaitQueueLength(cond);
+
+                fail("This method should throw exception when lock is not held.");
+            }
+            catch (IllegalMonitorStateException e) {
+                info("IllegalMonitorStateException thrown as it should be.");
+            }
+
+            lock.close();
+        }
+
+        // Test #lock() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            lock.lock();
+
+            lock.unlock();
+
+            lock.close();
+        }
+
+        // Test #lockInterruptibly() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            lock.lockInterruptibly();
+
+            lock.unlock();
+
+            lock.close();
+        }
+
+        // Test #tryLock() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            boolean success = lock.tryLock();
+
+            assertTrue(success);
+
+            lock.unlock();
+
+            lock.close();
+        }
+
+        // Test #tryLock(long, TimeUnit) method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            boolean success = lock.tryLock(1, MILLISECONDS);
+
+            assertTrue(success);
+
+            lock.unlock();
+
+            lock.close();
+        }
+
+        // Test #unlock() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            try {
+                lock.unlock();
+
+                fail("This method should throw exception when lock is not held.");
+            }
+            catch (IllegalMonitorStateException e) {
+                info("IllegalMonitorStateException thrown as it should be.");
+            }
+
+            lock.close();
+        }
+
+        // Test #removed() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            assertFalse(lock.removed());
+
+            lock.close();
+
+            assertTrue(lock.removed());
+        }
+
+        // Test #close() method.
+        {
+            IgniteLock lock = grid(0).reentrantLock("lock", true, true, true);
+
+            lock.close();
+
+            assertTrue(lock.removed());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockMultinode1() throws Exception {
+        testReentrantLockMultinode1(false);
+
+        testReentrantLockMultinode1(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testReentrantLockMultinode1(final boolean fair) throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteLock lock = grid(0).reentrantLock("s1", true, fair, true);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteLock lock = ignite.reentrantLock("s1", true, fair, false);
+
+                    assertNotNull(lock);
+
+                    IgniteCondition cond1 = lock.getOrCreateCondition("c1");
+
+                    IgniteCondition cond2 = lock.getOrCreateCondition("c2");
+
+                    try {
+                        boolean wait = lock.tryLock(30_000, MILLISECONDS);
+
+                        assertTrue(wait);
+
+                        cond2.signal();
+
+                        cond1.await();
+                    }
+                    finally {
+                        lock.unlock();
+                    }
+
+                    return null;
+                }
+            }));
+        }
+
+        boolean done = false;
+
+        while(!done) {
+            done = true;
+
+            for (IgniteInternalFuture<?> fut : futs){
+                if(!fut.isDone())
+                    done = false;
+            }
+
+            try{
+                lock.lock();
+
+                lock.getOrCreateCondition("c1").signal();
+
+                lock.getOrCreateCondition("c2").await(10,MILLISECONDS);
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockInterruptibly() throws Exception {
+        testLockInterruptibly(false);
+
+        testLockInterruptibly(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testLockInterruptibly(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    try {
+                        lock0.lockInterruptibly();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        assertFalse(Thread.currentThread().isInterrupted());
+
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was interrupted.
+                        assertTrue(isInterrupted);
+
+                        // Assert that locked is still owned by main thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread doesn't own the lock.
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(1000);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLockInterruptiblyMultinode() throws Exception {
+        testLockInterruptiblyMultinode(false);
+
+        testLockInterruptiblyMultinode(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testLockInterruptiblyMultinode(final boolean fair) throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        // Initialize reentrant lock.
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        lock0.lock();
+
+        // Number of threads, one per node.
+        final int threadCount = gridCount();
+
+        final AtomicLong threadCounter = new AtomicLong(0);
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    final int localNodeId = (int)threadCounter.getAndIncrement();
+
+                    final Ignite grid = grid(localNodeId);
+
+                    IgniteClosure<Ignite, Void> closure = new IgniteClosure<Ignite, Void>() {
+                        @Override public Void apply(Ignite ignite) {
+                            final IgniteLock l = ignite.reentrantLock("lock", true, true, true);
+
+                            final AtomicReference<Thread> thread = new AtomicReference<>();
+
+                            final AtomicBoolean done = new AtomicBoolean(false);
+
+                            final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+                            final IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true);
+
+                            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    try{
+                                        thread.set(Thread.currentThread());
+
+                                        l.lockInterruptibly();
+                                    }
+                                    catch(IgniteInterruptedException e){
+                                        exceptionThrown.set(true);
+                                    }
+                                    finally {
+                                        done.set(true);
+                                    }
+
+                                    return null;
+                                }
+                            });
+
+                            // Wait until l.lock() has been called.
+                            while(!l.hasQueuedThreads()){
+                                // No-op.
+                            }
+
+                            latch.countDown();
+
+                            latch.await();
+
+                            thread.get().interrupt();
+
+                            while(!done.get()){
+                                // No-op.
+                            }
+
+                            try {
+                                fut.get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                fail(e.getMessage());
+
+                                throw new RuntimeException(e);
+                            }
+
+                            assertTrue(exceptionThrown.get());
+
+                            return null;
+                        }
+                    };
+
+                    closure.apply(grid);
+
+                    return null;
+                }
+            }, threadCount);
+
+        fut.get();
+
+        lock0.unlock();
+
+        info("Checking if interrupted threads are removed from global waiting queue...");
+
+        // Check if interrupted threads are removed from global waiting queue.
+        boolean locked = lock0.tryLock(1000, MILLISECONDS);
+
+        info("Interrupted threads successfully removed from global waiting queue. ");
+
+        assertTrue(locked);
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLock() throws Exception {
+        testLock(false);
+
+        testLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testLock(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    try {
+                        lock0.lock();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+
+                        fail("Lock() method is uninterruptible.");
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that interrupted flag is set and clear it in order to call unlock().
+                        assertTrue(Thread.interrupted());
+
+                        // Assert that lock is still owned by this thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        lock0.unlock();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryLock() throws Exception {
+        testTryLock(false);
+
+        testTryLock(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testTryLock(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    boolean locked = false;
+
+                    try {
+                        locked = lock0.tryLock();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+
+                        fail("tryLock() method is uninterruptible.");
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that lock is locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertEquals(locked, lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (locked)
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryLockTimed() throws Exception {
+        testTryLockTimed(false);
+
+        testTryLockTimed(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testTryLockTimed(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        lock0.lock();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    boolean locked = false;
+
+                    try {
+                        locked = lock0.tryLock(100, TimeUnit.MILLISECONDS);
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that tryLock returned false.
+                        assertFalse(locked);
+
+                        // Assert that lock is still owned by main thread.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread doesn't own the lock.
+                        assertFalse(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (locked)
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        fut.get();
+
+        lock0.unlock();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConditionAwaitUninterruptibly() throws Exception {
+        testConditionAwaitUninterruptibly(false);
+
+        testConditionAwaitUninterruptibly(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testConditionAwaitUninterruptibly(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    lock0.lock();
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    try {
+                        cond.awaitUninterruptibly();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was not interrupted.
+                        assertFalse(isInterrupted);
+
+                        // Assert that lock is still locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Clear interrupt flag.
+                        assertTrue(Thread.interrupted());
+
+                        // Release lock.
+                        if (lock0.isHeldByCurrentThread())
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        lock0.lock();
+
+        for (Thread t : startedThreads) {
+            t.interrupt();
+
+            lock0.getOrCreateCondition("cond").signal();
+        }
+
+        lock0.unlock();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConditionInterruptAwait() throws Exception {
+        testConditionInterruptAwait(false);
+
+        testConditionInterruptAwait(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testConditionInterruptAwait(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 2;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    boolean isInterrupted = false;
+
+                    lock0.lock();
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    try {
+                        cond.await();
+                    }
+                    catch (IgniteInterruptedException e) {
+                        isInterrupted = true;
+                    }
+                    finally {
+                        // Assert that thread was interrupted.
+                        assertTrue(isInterrupted);
+
+                        // Assert that lock is still locked.
+                        assertTrue(lock0.isLocked());
+
+                        // Assert that this thread does own the lock.
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        // Release lock.
+                        if (lock0.isHeldByCurrentThread())
+                            lock0.unlock();
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        // Wait for all threads to attempt to acquire lock.
+        while (startedThreads.size() != totalThreads) {
+            Thread.sleep(500);
+        }
+
+        for (Thread t : startedThreads)
+            t.interrupt();
+
+        fut.get();
+
+        assertFalse(lock0.isLocked());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHasQueuedThreads() throws Exception {
+        testHasQueuedThreads(false);
+
+        testHasQueuedThreads(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testHasQueuedThreads(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 5;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        final Set<Thread> finishedThreads = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    startedThreads.add(Thread.currentThread());
+
+                    lock0.lock();
+
+                    // Wait until every thread tries to lock.
+                    do {
+                        Thread.sleep(1000);
+                    }
+                    while (startedThreads.size() != totalThreads);
+
+                    try {
+                        info("Acquired in separate thread. ");
+
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
+
+                        finishedThreads.add(Thread.currentThread());
+
+                        if (startedThreads.size() != finishedThreads.size()) {
+                            assertTrue(lock0.hasQueuedThreads());
+                        }
+
+                        for (Thread t : startedThreads) {
+                            assertTrue(lock0.hasQueuedThread(t) != finishedThreads.contains(t));
+                        }
+                    }
+                    finally {
+                        lock0.unlock();
+
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        fut.get();
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHasConditionQueuedThreads() throws Exception {
+        testHasConditionQueuedThreads(false);
+
+        testHasConditionQueuedThreads(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testHasConditionQueuedThreads(final boolean fair) throws Exception {
+        final IgniteLock lock0 = grid(0).reentrantLock("lock", true, fair, true);
+
+        assertEquals(0, lock0.getHoldCount());
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        final int totalThreads = 5;
+
+        final Set<Thread> startedThreads = new GridConcurrentHashSet<>();
+
+        final Set<Thread> finishedThreads = new GridConcurrentHashSet<>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    assertFalse(lock0.isHeldByCurrentThread());
+
+                    IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+                    lock0.lock();
+
+                    startedThreads.add(Thread.currentThread());
+
+                    // Wait until every thread tries to lock.
+                    do {
+                        cond.await();
+
+                        Thread.sleep(1000);
+                    }
+                    while (startedThreads.size() != totalThreads);
+
+                    try {
+                        info("Acquired in separate thread. Number of threads waiting on condition: "
+                            + lock0.getWaitQueueLength(cond));
+
+                        assertTrue(lock0.isHeldByCurrentThread());
+
+                        assertFalse(lock0.hasQueuedThread(Thread.currentThread()));
+
+                        finishedThreads.add(Thread.currentThread());
+
+                        if (startedThreads.size() != finishedThreads.size()) {
+                            assertTrue(lock0.hasWaiters(cond));
+                        }
+
+                        for (Thread t : startedThreads) {
+                            if (!finishedThreads.contains(t))
+                                assertTrue(lock0.hasWaiters(cond));
+                        }
+
+                        assertTrue(lock0.getWaitQueueLength(cond) == (startedThreads.size() - finishedThreads.size()));
+                    }
+                    finally {
+                        cond.signal();
+
+                        lock0.unlock();
+
+                        assertFalse(lock0.isHeldByCurrentThread());
+                    }
+
+                    return null;
+                }
+            }, totalThreads);
+
+        IgniteCondition cond = lock0.getOrCreateCondition("cond");
+
+        lock0.lock();
+
+        try {
+            // Wait until all threads are waiting on condition.
+            while (lock0.getWaitQueueLength(cond) != totalThreads) {
+                lock0.unlock();
+
+                Thread.sleep(1000);
+
+                lock0.lock();
+            }
+
+            // Signal once to get things started.
+            cond.signal();
+        }
+        finally {
+            lock0.unlock();
+        }
+
+        fut.get();
+
+        assertFalse(lock0.hasQueuedThreads());
+
+        for (Thread t : startedThreads)
+            assertFalse(lock0.hasQueuedThread(t));
+
+        lock0.close();
+    }
+
+    /**
+     * Tests if lock is evenly acquired among nodes when fair flag is set on.
+     * Since exact ordering of lock acquisitions cannot be guaranteed because it also depends
+     * on the OS thread scheduling, certain deviation from uniform distribution is tolerated.
+     * @throws Exception If failed.
+     */
+    public void testFairness() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        // Total number of ops.
+        final long opsCount = 10000;
+
+        // Allowed deviation from uniform distribution.
+        final double tolerance = 0.05;
+
+        // Shared counter.
+        final String OPS_COUNTER = "ops_counter";
+
+        // Number of threads, one per node.
+        final int threadCount = gridCount();
+
+        final AtomicLong threadCounter = new AtomicLong(0);
+
+        Ignite ignite = startGrid(gridCount());
+
+        // Initialize reentrant lock.
+        IgniteLock l = ignite.reentrantLock("lock", true, true, true);
+
+        // Initialize OPS_COUNTER.
+        ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, (long)0);
+
+        final Map<Integer, Long> counts = new ConcurrentHashMap<>();
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    final int localNodeId = (int)threadCounter.getAndIncrement();
+
+                    final Ignite grid = grid(localNodeId);
+
+                    IgniteClosure<Ignite, Long> closure = new IgniteClosure<Ignite, Long>() {
+                        @Override public Long apply(Ignite ignite) {
+                            IgniteLock l = ignite.reentrantLock("lock", true, true, true);
+
+                            long localCount = 0;
+
+                            IgniteCountDownLatch latch = ignite.countDownLatch("latch", threadCount, false, true);
+
+                            latch.countDown();
+
+                            latch.await();
+
+                            while(true){
+                                l.lock();
+
+                                try {
+                                    long opsCounter = (long) ignite.getOrCreateCache(OPS_COUNTER).get(OPS_COUNTER);
+
+                                    if(opsCounter == opsCount)
+                                        break;
+
+                                    ignite.getOrCreateCache(OPS_COUNTER).put(OPS_COUNTER, ++opsCounter);
+
+                                    localCount++;
+
+                                    if(localCount > 1000){
+                                        assertTrue(localCount < (1 + tolerance) * opsCounter / threadCount);
+
+                                        assertTrue(localCount > (1 - tolerance) * opsCounter / threadCount);
+                                    }
+
+                                    if(localCount % 100 == 0) {
+                                        info("Node [id=" +ignite.cluster().localNode().id() + "] acquired " +
+                                            localCount + " times. " + "Total ops count: " +
+                                            opsCounter + "/" + opsCount +"]");
+                                    }
+                                }
+                                finally {
+                                    l.unlock();
+                                }
+                            }
+
+                            return localCount;
+                        }
+                    };
+
+                    long localCount = closure.apply(grid);
+
+                    counts.put(localNodeId, localCount);
+
+                    return null;
+                }
+            }, threadCount);
+
+        fut.get();
+
+        long totalSum = 0;
+
+        for(int i=0; i<gridCount(); i++){
+
+            totalSum += counts.get(i);
+
+            info("Node " + grid(i).localNode().id() + " acquired the lock " + counts.get(i) + " times. ");
+        }
+
+        assertEquals(totalSum, opsCount);
+
+        l.close();
+
+        ignite.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index e60aed3..200e276 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -77,6 +77,7 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
      */
     public void testSemaphore() throws Exception {
         checkSemaphore();
+        checkSemaphoreSerialization();
     }
 
     /**
@@ -231,6 +232,36 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    private void checkSemaphoreSerialization() throws Exception {
+        final IgniteSemaphore sem = grid(0).semaphore("semaphore", -gridCount() + 1, true, true);
+
+        assertEquals(-gridCount() + 1, sem.availablePermits());
+
+        grid(0).compute().broadcast(new IgniteCallable<Object>() {
+            @Nullable @Override public Object call() throws Exception {
+                sem.release();
+
+                return null;
+            }
+        });
+
+        assert sem.availablePermits() == 1;
+
+        sem.acquire();
+
+        assert sem.availablePermits() == 0;
+
+        sem.release();
+
+        // Test operations on removed semaphore.
+        sem.close();
+
+        checkRemovedSemaphore(sem);
+    }
+
+    /**
      * @param semaphore Semaphore.
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java
new file mode 100644
index 0000000..7e1a11c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalLockSelfTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.local;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+
+/**
+ *
+ */
+public class IgniteLocalLockSelfTest extends IgniteLockAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return LOCAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testReentrantLock() throws Exception {
+        // Test main functionality.
+        IgniteLock lock = grid(0).reentrantLock("lock", true, false, true);
+
+        assertNotNull(lock);
+
+        assertEquals(0, lock.getHoldCount());
+
+        lock.lock();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
+            new Callable<Object>() {
+                @Nullable @Override public Object call() throws Exception {
+                    IgniteLock lock = grid(0).reentrantLock("lock", true, false, true);
+
+                    assert lock != null;
+
+                    info("Thread is going to wait on lock: " + Thread.currentThread().getName());
+
+                    assert lock.tryLock(1, MINUTES);
+
+                    info("Thread is again runnable: " + Thread.currentThread().getName());
+
+                    lock.unlock();
+
+                    return null;
+                }
+            },
+            THREADS_CNT,
+            "test-thread"
+        );
+
+        Thread.sleep(3000);
+
+        assert lock.isLocked();
+
+        assert lock.getHoldCount() == 1;
+
+        lock.lock();
+
+        assert lock.isLocked();
+
+        assert lock.getHoldCount() == 2;
+
+        lock.unlock();
+
+        assert lock.isLocked();
+
+        assert lock.getHoldCount() == 1;
+
+        lock.unlock();
+
+        // Ensure there are no hangs.
+        fut.get();
+
+        // Test operations on removed lock.
+        IgniteLock lock0 = grid(0).reentrantLock("lock", true, false, false);
+
+        assertNotNull(lock0);
+
+        lock0.close();
+
+        checkRemovedReentrantLock(lock0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java
new file mode 100644
index 0000000..787f1e3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedLockSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.partitioned;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePartitionedLockSelfTest extends IgniteLockAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java
new file mode 100644
index 0000000..00bb0fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedLockSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.datastructures.replicated;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.datastructures.IgniteLockAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class IgniteReplicatedLockSelfTest extends IgniteLockAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode atomicsCacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
index ae591bd..43acc41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheLockAbstractTest.java
@@ -573,4 +573,4 @@ public abstract class GridCacheLockAbstractTest extends GridCommonAbstractTest {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
index d4ca9a5..18537c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSystemProperties;
@@ -64,6 +65,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     /** Semaphore name. */
     private static final String TEST_SEMAPHORE_NAME = "test-semaphore";
 
+    /** Reentrant lock name. */
+    private static final String TEST_REENTRANT_LOCK_NAME = "test-reentrant-lock";
+
     /** */
     private static final CollectionConfiguration colCfg = new CollectionConfiguration();
 
@@ -98,6 +102,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
     private static final boolean SEMAPHORE = true;
 
     /** */
+    private static final boolean REENTRANTLOCK = true;
+
+    /** */
     private GridCacheDataStructuresLoadTest() {
         // No-op
     }
@@ -347,6 +354,44 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
             }
         };
 
+
+    /** Reentrant lock read closure. */
+    private final CIX1<Ignite> reentrantLockReadClos =
+        new CIX1<Ignite>() {
+            @Override public void applyx(Ignite ignite) {
+                IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true);
+
+                for (int i = 0; i < operationsPerTx; i++) {
+                    r.isLocked();
+
+                    long cnt = reads.incrementAndGet();
+
+                    if (cnt % READ_LOG_MOD == 0)
+                        info("Performed " + cnt + " reads.");
+                }
+            }
+        };
+
+    /** Reentrant lock write closure. */
+    private final CIX1<Ignite> reentrantLockWriteClos =
+        new CIX1<Ignite>() {
+            @Override public void applyx(Ignite ignite) {
+                IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true);
+
+                for (int i = 0; i < operationsPerTx; i++) {
+                    if ((i % 2) == 0)
+                        r.lock();
+                    else
+                        r.unlock();
+
+                    long cnt = writes.incrementAndGet();
+
+                    if (cnt % WRITE_LOG_MOD == 0)
+                        info("Performed " + cnt + " writes.");
+                }
+            }
+        };
+
     /**
      * @param args Arguments.
      * @throws IgniteCheckedException In case of error.
@@ -417,6 +462,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad
 
                 test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos);
             }
+
+            System.gc();
+
+            if (REENTRANTLOCK) {
+                info("Testing reentrant lock...");
+
+                test.loadTestIgnite(test.reentrantLockWriteClos, test.reentrantLockReadClos);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index c49c730..c9859fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
@@ -367,6 +368,15 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteLock reentrantLock(String name,
+        boolean failoverSafe,
+        boolean fair,
+        boolean create)
+    {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index a2e0d5a..2598bc5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -42,6 +42,7 @@ import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
@@ -545,6 +546,12 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteLock reentrantLock(String name, boolean failoverSafe,
+        boolean fair, boolean create) throws IgniteException {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> IgniteQueue<T> queue(String name, int cap,
         @Nullable CollectionConfiguration cfg) throws IgniteException {
         throw new UnsupportedOperationException("Operation isn't supported yet.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index e663a99..59a18e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.local.GridCach
 import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalSetSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalAtomicLongApiSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalCountDownLatchSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalLockSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.IgniteLocalSemaphoreSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicOffheapQueueApiSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedAtomicOffheapQueueCreateMultiNodeSelfTest;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.partitioned.Gr
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.GridCachePartitionedSetSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedAtomicLongApiSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedCountDownLatchSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedLockSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedQueueNoBackupsTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedSemaphoreSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.partitioned.IgnitePartitionedSetNoBackupsSelfTest;
@@ -80,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.replicated.Gri
 import org.apache.ignite.internal.processors.cache.datastructures.replicated.GridCacheReplicatedSetSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedAtomicLongApiSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedCountDownLatchSelfTest;
+import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedLockSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.replicated.IgniteReplicatedSemaphoreSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest;
 
@@ -107,6 +110,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCacheLocalAtomicQueueApiSelfTest.class));
         suite.addTest(new TestSuite(IgniteLocalCountDownLatchSelfTest.class));
         suite.addTest(new TestSuite(IgniteLocalSemaphoreSelfTest.class));
+        suite.addTest(new TestSuite(IgniteLocalLockSelfTest.class));
 
         suite.addTest(new TestSuite(GridCacheReplicatedSequenceApiSelfTest.class));
         suite.addTest(new TestSuite(GridCacheReplicatedSequenceMultiNodeSelfTest.class));
@@ -117,6 +121,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridCacheReplicatedDataStructuresFailoverSelfTest.class));
         suite.addTest(new TestSuite(IgniteReplicatedCountDownLatchSelfTest.class));
         suite.addTest(new TestSuite(IgniteReplicatedSemaphoreSelfTest.class));
+        suite.addTest(new TestSuite(IgniteReplicatedLockSelfTest.class));
         suite.addTest(new TestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class));
 
         suite.addTest(new TestSuite(GridCachePartitionedSequenceApiSelfTest.class));
@@ -139,6 +144,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(IgnitePartitionedCountDownLatchSelfTest.class));
         suite.addTest(new TestSuite(IgniteDataStructureWithJobTest.class));
         suite.addTest(new TestSuite(IgnitePartitionedSemaphoreSelfTest.class));
+        suite.addTest(new TestSuite(IgnitePartitionedLockSelfTest.class));
 
         suite.addTest(new TestSuite(GridCachePartitionedSetFailoverSelfTest.class));
         suite.addTest(new TestSuite(GridCachePartitionedOffheapSetFailoverSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 92d8c1d..03c7b0e 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -425,6 +425,17 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteLock reentrantLock(String name,
+        boolean failoverSafe,
+        boolean fair,
+        boolean create)
+    {
+        assert g != null;
+
+        return g.reentrantLock(name, failoverSafe, create, fair);
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)


Mime
View raw message