ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/22] ignite git commit: IGNITE-642 Implement IgniteReentrantLock data structure
Date Thu, 28 Apr 2016 11:28:19 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
new file mode 100644
index 0000000..3ab7289
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -0,0 +1,1538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectStreamException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache reentrant lock implementation based on AbstractQueuedSynchronizer.
+ */
+public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deserialization stash. */
+    private static final ThreadLocal<String> stash = new ThreadLocal<>();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Reentrant lock name. */
+    private String name;
+
+    /** Removed flag. */
+    private volatile boolean rmvd;
+
+    /** Reentrant lock key. */
+    private GridCacheInternalKey key;
+
+    /** Reentrant lock projection. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView;
+
+    /** Cache context. */
+    private final GridCacheContext ctx;
+
+    /** Initialization guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Lock that provides non-overlapping processing of updates. */
+    private Lock updateLock = new ReentrantLock();
+
+    /** Internal synchronization object. */
+    private Sync sync;
+
+    /** Flag indicating that every operation on this lock should be interrupted. */
+    private volatile boolean interruptAll;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridCacheLockImpl() {
+        // This instance should never be used directly.
+        ctx = null;
+    }
+
+    /**
+     * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer.
+     */
+    @SuppressWarnings({"CallToThreadYield", "CallToSignalInsteadOfSignalAll"})
+    private class Sync extends AbstractQueuedSynchronizer {
+        /** */
+        private static final long serialVersionUID = 1192457210091910933L;
+
+        /** */
+        private static final long LOCK_FREE = 0;
+
+        /** Map containing condition objects. */
+        private Map<String, ConditionObject> conditionMap;
+
+        /** List of condition signal calls on this node. */
+        private Map<String, Integer> outgoingSignals;
+
+        /** Last condition waited on. */
+        @Nullable
+        private volatile String lastCondition;
+
+        /** True if any node owning the lock had failed. */
+        private volatile boolean isBroken;
+
+        /** UUID of the node that currently owns the lock. */
+        private volatile UUID currentOwnerNode;
+
+        /** ID of the thread that currently owns the lock. */
+        private volatile long currentOwnerThreadId;
+
+        /** UUID of this node. */
+        private final UUID thisNode;
+
+        /** FailoverSafe flag. */
+        private final boolean failoverSafe;
+
+        /** Fairness flag. */
+        private final boolean fair;
+
+        /** Threads that are waiting on this lock. */
+        private Set<Long> waitingThreads;
+
+        /**
+         * @param state State.
+         */
+        protected Sync(GridCacheLockState state) {
+            setState(state.get());
+
+            thisNode = ctx.localNodeId();
+
+            currentOwnerNode = state.getId();
+
+            currentOwnerThreadId = state.getThreadId();
+
+            conditionMap = new HashMap<>();
+
+            outgoingSignals = new HashMap<>();
+
+            failoverSafe = state.isFailoverSafe();
+
+            fair = state.isFair();
+
+            waitingThreads = new ConcurrentSkipListSet<>();
+        }
+
+        /**
+         *
+         */
+        protected void addOutgoingSignal(String condition) {
+            int cnt = 0;
+
+            if (outgoingSignals.containsKey(condition)) {
+                cnt = outgoingSignals.get(condition);
+
+                // SignalAll has already been called.
+                if (cnt == 0)
+                    return;
+            }
+
+            outgoingSignals.put(condition, cnt + 1);
+        }
+
+        protected void addOutgoingSignalAll(String condition) {
+            outgoingSignals.put(condition, 0);
+        }
+
+        /**
+         * Process any condition await calls on this node.
+         */
+        private String processAwait() {
+            if (lastCondition == null)
+                return null;
+
+            String ret = lastCondition;
+
+            lastCondition = null;
+
+            return ret;
+        }
+
+        /** */
+        private Map<String, Integer> processSignal() {
+            Map<String,Integer> ret = new HashMap<>(outgoingSignals);
+
+            outgoingSignals.clear();
+
+            return ret;
+        }
+
+        /** Interrupt every thread on this node waiting on this lock. */
+        private synchronized void interruptAll() {
+            // First release all threads waiting on associated condition queues.
+            if (!conditionMap.isEmpty()) {
+                // Temporarily obtain ownership of the lock,
+                // in order to signal all conditions.
+                UUID tempUUID = getOwnerNode();
+
+                long tempThreadID = currentOwnerThreadId;
+
+                setCurrentOwnerNode(thisNode);
+
+                currentOwnerThreadId = Thread.currentThread().getId();
+
+                for (Condition c : conditionMap.values())
+                    c.signalAll();
+
+                // Restore owner node and owner thread.
+                setCurrentOwnerNode(tempUUID);
+
+                currentOwnerThreadId = tempThreadID;
+            }
+
+            // Interrupt any future call to acquire/release on this sync object.
+            interruptAll = true;
+
+            // Interrupt any ongoing transactions.
+            for (Thread t: getQueuedThreads())
+                t.interrupt();
+        }
+
+        /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode),
+         * if not throw  {@linkplain IgniteInterruptedException} */
+        private void validate(final boolean throwInterrupt) {
+            // Interrupted flag shouldn't be always cleared
+            // (e.g. lock() method doesn't throw exception and doesn't clear interrupted)
+            // but should be cleared if this method is called after lock breakage or node stop.
+            // If interruptAll is set, exception is thrown anyway.
+            boolean interrupted = Thread.currentThread().isInterrupted();
+
+            // Clear interrupt flag.
+            if (throwInterrupt || interruptAll)
+                Thread.interrupted();
+
+            if (interruptAll)
+                throw new IgniteException("Lock broken (possible reason: node stopped" +
+                    " or node owning lock failed while in non-failoversafe mode).");
+
+            // Global queue should be synchronized only if interrupted exception should be thrown.
+            if (fair && (throwInterrupt && interrupted) && !interruptAll) {
+                synchronizeQueue(true, Thread.currentThread());
+
+                throw new IgniteInterruptedException("Lock is interrupted.");
+            }
+        }
+
+        /**
+         * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain
+         * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+         *
+         * @param permits Number of permits acquired at this reentrant lock.
+         */
+        final synchronized void setPermits(int permits) {
+            setState(permits);
+        }
+
+        /**
+         * Gets the number of permissions currently acquired at this lock.
+         *
+         * @return Number of permits acquired at this reentrant lock.
+         */
+        final int getPermits() {
+            return getState();
+        }
+
+        /**
+         * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain
+         * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+         *
+         * @param ownerNode UUID of the node owning this lock.
+         */
+        final synchronized void setCurrentOwnerNode(UUID ownerNode) {
+            currentOwnerNode = ownerNode;
+        }
+
+        /**
+         * Gets the UUID of the node that currently owns the lock.
+         *
+         * @return UUID of the node that currently owns the lock.
+         */
+        final UUID getOwnerNode() {
+            return currentOwnerNode;
+        }
+
+        /**
+         * Checks if latest call to acquire/release was called on this node.
+         * Should only be called from update method.
+         *
+         * @param newOwnerID ID of the node that is about to acquire this lock (or null).
+         * @return true if acquire/release that triggered last update came from this node.
+         */
+        protected boolean isLockedLocally(UUID newOwnerID) {
+            return thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID);
+        }
+
+        protected void setCurrentOwnerThread(long newOwnerThreadId) {
+            currentOwnerThreadId = newOwnerThreadId;
+        }
+
+        /**
+         * Returns true if node that owned the locked failed before call to unlock.
+         *
+         * @return true if any node failed while owning the lock.
+         */
+        protected boolean isBroken() {
+            return isBroken;
+        }
+
+        /** */
+        protected void setBroken(boolean isBroken) {
+            this.isBroken = isBroken;
+        }
+
+        /** */
+        protected synchronized boolean hasPredecessor(LinkedList<UUID> nodes) {
+            if (!fair)
+                return false;
+
+            for (Iterator<UUID> it = nodes.iterator(); it.hasNext(); ) {
+                UUID node = it.next();
+
+                if (ctx.discovery().node(node) == null) {
+                    it.remove();
+
+                    continue;
+                }
+
+                return !node.equals(thisNode);
+            }
+
+            return false;
+        }
+
+        /**
+         * Performs tryLock.
+         * @param acquires Number of permits to acquire.
+         * @param fair Fairness parameter.
+         * @return {@code True} if succeeded, false otherwise.
+         */
+        final boolean tryAcquire(final int acquires, final boolean fair) {
+            // If broken in non-failoversafe mode, exit immediately.
+            if (interruptAll)
+                return true;
+
+            final Thread current = Thread.currentThread();
+
+            boolean failed = false;
+
+            int c = getState();
+
+            // Wait for lock to reach stable state.
+            while (c != 0) {
+                UUID currentOwner = currentOwnerNode;
+
+                if (currentOwner != null) {
+                    failed = ctx.discovery().node(currentOwner) == null;
+
+                    break;
+                }
+
+                c = getState();
+            }
+
+            // Check if lock is released or current owner failed.
+            if (c == 0 || failed) {
+                if (compareAndSetGlobalState(0, acquires, current, fair)) {
+
+                    // Not used for synchronization (we use ThreadID), but updated anyway.
+                    setExclusiveOwnerThread(current);
+
+                    while (!isHeldExclusively() && !interruptAll)
+                        Thread.yield();
+
+                    return true;
+                }
+            }
+            else if (isHeldExclusively()) {
+                int nextc = c + acquires;
+
+                if (nextc < 0) // overflow
+                    throw new Error("Maximum lock count exceeded.");
+
+                setState(nextc);
+
+                return true;
+            }
+
+            if (fair && !isQueued(current))
+                synchronizeQueue(false, current);
+
+            return false;
+        }
+
+        /**
+         * Performs lock.
+         */
+        final void lock() {
+            acquire(1);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected final boolean tryAcquire(int acquires) {
+            return tryAcquire(acquires, fair);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected final boolean tryRelease(int releases) {
+            // This method is called with release==0 only when trying to wake through update,
+            // to check if some other node released the lock.
+            if (releases == 0)
+                return true;
+
+            // If broken in non-failoversafe mode, exit immediately.
+            if (interruptAll)
+                return true;
+
+            int c = getState() - releases;
+
+            if (!isHeldExclusively()) {
+                log.error("Lock.unlock() is called in illegal state [callerNodeId=" + thisNode + ", ownerNodeId="
+                    + currentOwnerNode + ", callerThreadId=" + Thread.currentThread().getId() + ", ownerThreadId="
+                    + currentOwnerThreadId + ", lockState=" + getState() + "]");
+
+                throw new IllegalMonitorStateException();
+            }
+
+            boolean free = false;
+
+            if (c == 0) {
+                free = true;
+
+                setGlobalState(0, processAwait(), processSignal());
+
+                while (isHeldExclusively() && !interruptAll)
+                    Thread.yield();
+            }
+            else
+                setState(c);
+
+            return free;
+        }
+
+
+        /** {@inheritDoc} */
+        @Override protected final boolean isHeldExclusively() {
+            // While we must in general read state before owner,
+            // we don't need to do so to check if current thread is owner
+
+            return currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode);
+        }
+
+        /**
+         * @param name Condition name.
+         * @return Condition object.
+         */
+        final synchronized IgniteCondition newCondition(String name) {
+            if (conditionMap.containsKey(name))
+                return new IgniteConditionObject(name, conditionMap.get(name));
+
+            ConditionObject cond = new ConditionObject();
+
+            conditionMap.put(name, cond);
+
+            return new IgniteConditionObject(name, cond);
+        }
+
+        // Methods relayed from outer class
+
+        final int getHoldCount() {
+            return isHeldExclusively() ? getState() : 0;
+        }
+
+        final boolean isLocked() throws IgniteCheckedException {
+            return getState() != 0 || lockView.get(key).get() != 0;
+        }
+
+        /**
+         * This method is used for synchronizing the reentrant lock state across all nodes.
+         */
+        protected boolean compareAndSetGlobalState(final int expVal, final int newVal,
+            final Thread newThread, final boolean bargingProhibited) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+                                final long newThreadID = newThread.getId();
+
+                                LinkedList<UUID> nodes = val.getNodes();
+
+                                // Barging is prohibited in fair mode unless tryLock() is called.
+                                if (!(bargingProhibited && hasPredecessor(nodes))) {
+                                    if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) {
+                                        val.set(newVal);
+
+                                        val.setId(thisNode);
+
+                                        val.setThreadId(newThreadID);
+
+                                        val.setSignals(null);
+
+                                        // This node is already in queue, except in cases where this is the only node
+                                        // or this is a call to tryLock(), in which case barging is ok.
+                                        // Queue is only updated if this is fair lock.
+                                        if (val.isFair() && (nodes.isEmpty() || !bargingProhibited))
+                                            nodes.addFirst(thisNode);
+
+                                        val.setNodes(nodes);
+
+                                        val.setChanged(true);
+
+                                        lockView.put(key, val);
+
+                                        tx.commit();
+
+                                        return true;
+                                    }
+                                }
+
+                                return false;
+                            }
+                            catch (Exception e) {
+                                if (interruptAll) {
+                                    log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+                                        " aborting transaction.");
+
+                                    // Return immediately, exception will be thrown later.
+                                    return true;
+                                }
+                                else {
+                                    if (Thread.currentThread().isInterrupted()) {
+                                        log.info("Thread is interrupted while attempting to acquire lock.");
+
+                                        // Delegate the decision to throw InterruptedException to the AQS.
+                                        sync.release(0);
+
+                                        return false;
+                                    }
+
+                                    U.error(log, "Failed to compare and set: " + this, e);
+                                }
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        /**
+         * This method is used for synchronizing the number of acquire attempts on this lock across all nodes.
+         *
+         * @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered.
+         */
+        protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) {
+            final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+                                LinkedList<UUID> nodes = val.getNodes();
+
+                                if (!cancelled) {
+                                    nodes.add(thisNode);
+
+                                    val.setChanged(false);
+
+                                    lockView.put(key, val);
+
+                                    tx.commit();
+
+                                    // Keep track of all threads that are queued in global queue.
+                                    // We deliberately don't use #sync.isQueued(), because AQS
+                                    // cancel threads immediately after throwing interrupted exception.
+                                    sync.waitingThreads.add(thread.getId());
+
+                                    return true;
+                                }
+                                else {
+                                    if (sync.waitingThreads.contains(thread.getId())) {
+                                        // Update other nodes if this is the first node in queue.
+                                        val.setChanged(nodes.lastIndexOf(thisNode) == 0);
+
+                                        nodes.removeLastOccurrence(thisNode);
+
+                                        lockView.put(key, val);
+
+                                        tx.commit();
+
+                                        sync.waitingThreads.remove(thread.getId());
+
+                                        return true;
+                                    }
+                                }
+
+                                return false;
+                            }
+                            catch (Exception e) {
+                                if (interruptAll) {
+                                    log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+                                        " aborting transaction.");
+
+                                    // Abort this attempt to synchronize queue and start another one,
+                                    // that will return immediately.
+                                    sync.release(0);
+
+                                    return false;
+                                }
+                                else {
+                                    // If thread got interrupted, abort this attempt to synchronize queue,
+                                    // clear interrupt flag and try again, and let the AQS decide
+                                    // whether to throw an exception or ignore it.
+                                    if (Thread.interrupted() || X.hasCause(e, InterruptedException.class)) {
+                                        interrupted.set(true);
+
+                                        throw new TransactionRollbackException("Thread got interrupted " +
+                                            "while synchronizing the global queue, retrying. ");
+                                    }
+
+                                    U.error(log, "Failed to synchronize global lock queue: " + this, e);
+                                }
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+            finally {
+                // Restore interrupt flag and let AQS decide what to do with it.
+                if (interrupted.get())
+                    Thread.currentThread().interrupt();
+            }
+        }
+
+        /**
+         * Sets the global state across all nodes after releasing the reentrant lock.
+         *
+         * @param newVal New state.
+         * @param lastCondition Id of the condition await is called.
+         * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock.
+         */
+        protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+                                val.set(newVal);
+
+                                if (newVal == 0) {
+                                    val.setId(null);
+
+                                    val.setThreadId(LOCK_FREE);
+                                }
+
+                                val.setChanged(true);
+
+                                // If this lock is fair, remove this node from queue.
+                                if (val.isFair() && newVal == 0) {
+                                    UUID removedNode = val.getNodes().removeFirst();
+
+                                    assert(thisNode.equals(removedNode));
+                                }
+
+                                // Get global condition queue.
+                                Map<String, LinkedList<UUID>> condMap = val.getConditionMap();
+
+                                // Create map containing signals from this node.
+                                Map<UUID, LinkedList<String>> signalMap = new HashMap<>();
+
+                                // Put any signal calls on this node to global state.
+                                if (!outgoingSignals.isEmpty()) {
+                                    for (String condition : outgoingSignals.keySet()) {
+                                        int cnt = outgoingSignals.get(condition);
+
+                                        // Get queue for this condition.
+                                        List<UUID> list = condMap.get(condition);
+
+                                        if (list != null && !list.isEmpty()) {
+                                            // Check if signalAll was called.
+                                            if (cnt == 0) {
+                                                cnt = list.size();
+                                            }
+
+                                            // Remove from global condition queue.
+                                            for (int i = 0; i < cnt; i++) {
+                                                if (list.isEmpty())
+                                                    break;
+
+                                                UUID uuid = list.remove(0);
+
+                                                // Skip if node to be released is not alive anymore.
+                                                if (ctx.discovery().node(uuid) == null) {
+                                                    cnt++;
+
+                                                    continue;
+                                                }
+
+                                                LinkedList<String> queue = signalMap.get(uuid);
+
+                                                if (queue == null) {
+                                                    queue = new LinkedList<>();
+
+                                                    signalMap.put(uuid, queue);
+                                                }
+
+                                                queue.add(condition);
+                                            }
+                                        }
+                                    }
+                                }
+
+                                val.setSignals(signalMap);
+
+                                // Check if this release is called after condition.await() call;
+                                // If true, add this node to the global waiting queue.
+                                if (lastCondition != null) {
+                                    LinkedList<UUID> queue;
+
+                                    //noinspection IfMayBeConditional
+                                    if (!condMap.containsKey(lastCondition))
+                                        // New condition object.
+                                        queue = new LinkedList<>();
+                                    else
+                                        // Existing condition object.
+                                        queue = condMap.get(lastCondition);
+
+                                    queue.add(thisNode);
+
+                                    condMap.put(lastCondition, queue);
+                                }
+
+                                val.setConditionMap(condMap);
+
+                                lockView.put(key, val);
+
+                                tx.commit();
+
+                                return true;
+                            }
+                            catch (Exception e) {
+                                if (interruptAll) {
+                                    log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
+                                        " aborting transaction.");
+
+                                    return true;
+                                }
+                                else
+                                    U.error(log, "Failed to release: " + this, e);
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        protected synchronized boolean checkIncomingSignals(GridCacheLockState state) {
+            if (state.getSignals() == null)
+                return false;
+
+            LinkedList<String> signals = state.getSignals().get(thisNode);
+
+            if (signals == null || signals.isEmpty())
+                return false;
+
+            UUID tempUUID = getOwnerNode();
+
+            Thread tempThread = getExclusiveOwnerThread();
+
+            long tempThreadID = currentOwnerThreadId;
+
+            // Temporarily allow current thread to signal condition object.
+            // This is safe to do because:
+            // 1. if release was called on this node,
+            // it was called from currently active thread;
+            // 2. if release came from a thread on any other node,
+            // all threads on this node are already blocked.
+            setCurrentOwnerNode(thisNode);
+
+            setExclusiveOwnerThread(Thread.currentThread());
+
+            currentOwnerThreadId = Thread.currentThread().getId();
+
+            for (String signal: signals)
+                conditionMap.get(signal).signal();
+
+            // Restore owner node and owner thread.
+            setCurrentOwnerNode(tempUUID);
+
+            setExclusiveOwnerThread(tempThread);
+
+            currentOwnerThreadId = tempThreadID;
+
+            return true;
+        }
+
+        /**
+         *  Condition implementation for {@linkplain IgniteLock}.
+         *
+         **/
+        private class IgniteConditionObject implements IgniteCondition {
+            /** */
+            private final String name;
+
+            /** */
+            private final AbstractQueuedSynchronizer.ConditionObject object;
+
+            /**
+             * @param name Condition name.
+             * @param object Condition object.
+             */
+            protected IgniteConditionObject(String name, ConditionObject object) {
+                this.name = name;
+
+                this.object = object;
+            }
+
+            /**
+             * Name of this condition.
+             *
+             * @return name Name of this condition object.
+             */
+            @Override public String name() {
+                return name;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void await() throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = name;
+
+                    object.await();
+
+                    sync.validate(true);
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void awaitUninterruptibly() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = name;
+
+                    object.awaitUninterruptibly();
+
+                    sync.validate(false);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = name;
+
+                    long result =  object.awaitNanos(nanosTimeout);
+
+                    sync.validate(true);
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = name;
+
+                    boolean result = object.await(time, unit);
+
+                    sync.validate(true);
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = name;
+
+                    boolean result = object.awaitUntil(deadline);
+
+                    sync.validate(true);
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void signal() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    validate(false);
+
+                    addOutgoingSignal(name);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void signalAll() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    sync.validate(false);
+
+                    addOutgoingSignalAll(name);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Reentrant lock name.
+     * @param key Reentrant lock key.
+     * @param lockView Reentrant lock projection.
+     * @param ctx Cache context.
+     */
+    @SuppressWarnings("unchecked")
+    public GridCacheLockImpl(String name,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView,
+        GridCacheContext ctx) {
+        assert name != null;
+        assert key != null;
+        assert ctx != null;
+        assert lockView != null;
+
+        this.name = name;
+        this.key = key;
+        this.lockView = lockView;
+        this.ctx = ctx;
+
+        log = ctx.logger(getClass());
+    }
+
+    /**
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private void initializeReentrantLock() throws IgniteCheckedException {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                sync = CU.outTx(
+                    retryTopologySafe(new Callable<Sync>() {
+                        @Override public Sync call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find reentrant lock with given name: " + name);
+
+                                    return null;
+                                }
+
+                                tx.rollback();
+
+                                return new Sync(val);
+                            }
+                        }
+                    }),
+                    ctx
+                );
+
+                if (log.isDebugEnabled())
+                    log.debug("Initialized internal sync structure: " + sync);
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            U.await(initLatch);
+
+            if (sync == null)
+                throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdate(GridCacheLockState val) {
+        // Called only on initialization, so it's safe to ignore update.
+        if (sync == null)
+            return;
+
+        updateLock.lock();
+
+        try {
+            // If this update is a result of unsuccessful acquire in fair mode, no local update should be done.
+            if (!val.isChanged())
+                return;
+
+            // Check if update came from this node.
+            boolean local = sync.isLockedLocally(val.getId());
+
+            // Process any incoming signals.
+            boolean incomingSignals = sync.checkIncomingSignals(val);
+
+            // Update permission count.
+            sync.setPermits(val.get());
+
+            // Update owner's node id.
+            sync.setCurrentOwnerNode(val.getId());
+
+            // Update owner's thread id.
+            sync.setCurrentOwnerThread(val.getThreadId());
+
+            // Check if any threads waiting on this node need to be notified.
+            if ((incomingSignals || sync.getPermits() == 0) && !local) {
+                // Try to notify any waiting threads.
+                sync.release(0);
+            }
+
+        } finally{
+            updateLock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onNodeRemoved(UUID nodeId) {
+        updateLock.lock();
+
+        try {
+            if (nodeId.equals(sync.getOwnerNode())) {
+                sync.setBroken(true);
+
+                if (!sync.failoverSafe) {
+                    sync.interruptAll();
+                }
+            }
+
+            // Try to notify any waiting threads.
+            sync.release(0);
+        }
+        finally {
+            updateLock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onStop() {
+        if (sync == null) {
+            interruptAll = true;
+
+            return;
+        }
+
+        sync.setBroken(true);
+
+        sync.interruptAll();
+
+        // Try to notify any waiting threads.
+        sync.release(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            sync.lock();
+
+            sync.validate(false);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lockInterruptibly() throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
+        try {
+            initializeReentrantLock();
+
+            sync.acquireInterruptibly(1);
+
+            sync.validate(true);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            if (sync.fair)
+                sync.synchronizeQueue(true, Thread.currentThread());
+
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryLock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            boolean result = sync.tryAcquire(1, false);
+
+            sync.validate(false);
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout));
+
+            sync.validate(true);
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            if (sync.fair)
+                sync.synchronizeQueue(true, Thread.currentThread());
+
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unlock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            // Validate before release.
+            sync.validate(false);
+
+            sync.release(1);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    @NotNull @Override public Condition newCondition() {
+        throw new UnsupportedOperationException("IgniteLock does not allow creation of nameless conditions. ");
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCondition getOrCreateCondition(String name) {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            IgniteCondition result = sync.newCondition(name);
+
+            sync.validate(false);
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getHoldCount() {
+        try{
+            initializeReentrantLock();
+
+            return sync.getHoldCount();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isHeldByCurrentThread() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isHeldExclusively();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocked() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isLocked();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThreads() {
+        try{
+            initializeReentrantLock();
+
+            return sync.hasQueuedThreads();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThread(Thread thread) {
+        try{
+            initializeReentrantLock();
+
+            return sync.isQueued(thread);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasWaiters(IgniteCondition condition) {
+        try{
+            initializeReentrantLock();
+
+            AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+            if (c == null)
+                throw new IllegalArgumentException();
+
+            return sync.hasWaiters(c);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWaitQueueLength(IgniteCondition condition) {
+        try{
+            initializeReentrantLock();
+
+            AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+            if (c == null)
+                throw new IllegalArgumentException();
+
+            return sync.getWaitQueueLength(c);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override public boolean isFailoverSafe() {
+        try{
+            initializeReentrantLock();
+
+            return sync.failoverSafe;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override public boolean isFair() {
+        try{
+            initializeReentrantLock();
+
+            return sync.fair;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBroken() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isBroken();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheInternalKey key() {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return rmvd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onRemoved() {
+        return rmvd = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void needCheckNotRemoved() {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        stash.set(in.readUTF());
+    }
+
+    /**
+     * Reconstructs object on unmarshalling.
+     *
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
+     */
+    private Object readResolve() throws ObjectStreamException {
+        String name = stash.get();
+
+        assert name != null;
+
+        try {
+            IgniteLock lock = IgnitionEx.localIgnite().context().dataStructures().reentrantLock(
+                name,
+                false,
+                false,
+                false);
+
+            if (lock == null)
+                throw new IllegalStateException("Lock was not found on deserialization: " + name);
+
+            return lock;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+        finally {
+            stash.remove();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (!rmvd) {
+            try {
+                boolean force = sync != null && (sync.isBroken() && !sync.failoverSafe);
+
+                ctx.kernalContext().dataStructures().removeReentrantLock(name, force);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheLockImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
new file mode 100644
index 0000000..3feb9bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *  Grid cache reentrant lock state.
+ */
+public final class GridCacheLockState implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Count. */
+    private int cnt;
+
+    /** Owner thread local ID. */
+    private long threadId;
+
+    /** Owner node ID. */
+    private UUID id;
+
+    /** FailoverSafe flag. */
+    private boolean failoverSafe;
+
+    /** Map containing state for each condition object associated with this lock. */
+    @GridToStringInclude
+    private Map<String, LinkedList<UUID>> conditionMap;
+
+    /** Map containing unprocessed signals for condition objects that are associated with this lock. */
+    @GridToStringInclude
+    private Map<UUID, LinkedList<String>> signals;
+
+    /** Flag indicating lock fairness. */
+    private boolean fair;
+
+    /** Queue containing nodes that are waiting to acquire this lock, used to ensure fairness. */
+    @GridToStringInclude
+    private LinkedList<UUID> nodes;
+
+    /**
+     * Flag indicating that global state changed.
+     * Used in fair mode to ensure that only successful acquires and releases trigger update.
+     */
+    private boolean changed;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Initial count.
+     * @param id UUID of owning node.
+     * @param threadID ID of the current thread.
+     * @param failoverSafe true if created in failoverSafe mode.
+     * @param fair true if created in fair mode.
+     */
+    public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe, boolean fair) {
+        assert cnt >= 0;
+
+        this.id = id;
+
+        this.threadId = threadID;
+
+        conditionMap = new HashMap();
+
+        signals = null;
+
+        nodes = new LinkedList<UUID>();
+
+        this.fair = fair;
+
+        this.failoverSafe = failoverSafe;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheLockState() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void set(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int get() {
+        return cnt;
+    }
+
+    /**
+     * @return Current owner thread ID.
+     */
+    public long getThreadId() {
+        return threadId;
+    }
+
+    /**
+     * @param threadId New thread owner ID.
+     */
+    public void setThreadId(long threadId) {
+        this.threadId = threadId;
+    }
+
+    /**
+     * @return Current owner node ID.
+     */
+    public UUID getId() {
+        return id;
+    }
+
+    /**
+     * @return New owner node ID.
+     */
+    public void setId(UUID id) {
+        this.id = id;
+    }
+
+    /**
+     * @return Failover safe flag.
+     */
+    public boolean isFailoverSafe() {
+        return failoverSafe;
+    }
+
+    /**
+     * @return Condition count.
+     */
+    public int condtionCount(){
+        return conditionMap.size();
+    }
+
+    /**
+     * @return Condition map.
+     */
+    public Map<String, LinkedList<UUID>> getConditionMap() {
+        return conditionMap;
+    }
+
+    /**
+     * @param conditionMap Condition map.
+     */
+    public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) {
+        this.conditionMap = conditionMap;
+    }
+
+    /**
+     * @return Signals.
+     */
+    public Map<UUID, LinkedList<String>> getSignals() {
+        return signals;
+    }
+
+    /**
+     * @param signals Signals.
+     */
+    public void setSignals(Map<UUID, LinkedList<String>> signals) {
+        this.signals = signals;
+    }
+
+    /**
+     * @return Nodes.
+     */
+    public LinkedList<UUID> getNodes() {
+        return nodes;
+    }
+
+    /**
+     * @param nodes Nodes.
+     */
+    public void setNodes(LinkedList<UUID> nodes) {
+        this.nodes = nodes;
+    }
+
+    /**
+     * @return Fair flag.
+     */
+    public boolean isFair() {
+        return fair;
+    }
+
+    /**
+     * @return Changed flag.
+     */
+    public boolean isChanged() {
+        return changed;
+    }
+
+    /**
+     * @param changed Changed flag.
+     */
+    public void setChanged(boolean changed) {
+        this.changed = changed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeLong(threadId);
+        U.writeUuid(out, id);
+
+        out.writeBoolean(failoverSafe);
+
+        out.writeBoolean(fair);
+
+        out.writeBoolean(changed);
+
+        out.writeBoolean(conditionMap != null);
+
+        if (conditionMap != null) {
+            out.writeInt(conditionMap.size());
+
+            for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) {
+                U.writeString(out, e.getKey());
+
+                out.writeInt(e.getValue().size());
+
+                for (UUID uuid:e.getValue())
+                    U.writeUuid(out, uuid);
+            }
+        }
+
+        out.writeBoolean(signals != null);
+
+        if (signals != null) {
+            out.writeInt(signals.size());
+
+            for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) {
+                U.writeUuid(out, e.getKey());
+
+                out.writeInt(e.getValue().size());
+
+                for (String condition:e.getValue())
+                    U.writeString(out, condition);
+            }
+        }
+
+        out.writeBoolean(nodes != null);
+
+        if (nodes != null) {
+            out.writeInt(nodes.size());
+
+            for (UUID uuid: nodes)
+                U.writeUuid(out, uuid);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        threadId = in.readLong();
+        id = U.readUuid(in);
+
+        failoverSafe = in.readBoolean();
+
+        fair = in.readBoolean();
+
+        changed = in.readBoolean();
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            conditionMap = U.newLinkedHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                String key = U.readString(in);
+
+                int size1 = in.readInt();
+
+                LinkedList<UUID> list = new LinkedList();
+
+                for (int j = 0; j < size1; j++)
+                    list.add(U.readUuid(in));
+
+                conditionMap.put(key, list);
+            }
+        }
+
+        if (in.readBoolean()) {
+            assert (conditionMap != null);
+
+            int size = in.readInt();
+
+            signals = U.newLinkedHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                UUID node = U.readUuid(in);
+
+                int size1 = in.readInt();
+
+                LinkedList<String> list = new LinkedList();
+
+                for (int j = 0; j < size1; j++)
+                    list.add(U.readString(in));
+
+                signals.put(node, list);
+            }
+        }
+        else
+            signals = null;
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            nodes = new LinkedList();
+
+            for (int i = 0; i < size; i++)
+                nodes.add(U.readUuid(in));
+        }
+        else
+            nodes = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheLockState.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index c365f9d..8f196be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.datastructures;
 
 import java.io.Externalizable;
 import java.io.IOException;
+import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.ObjectStreamException;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -32,7 +34,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -885,6 +889,35 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         t.set2(in.readUTF());
     }
 
+    /**
+     * Reconstructs object on unmarshalling.
+     *
+     * @return Reconstructed object.
+     * @throws ObjectStreamException Thrown in case of unmarshalling error.
+     */
+    private Object readResolve() throws ObjectStreamException {
+        try {
+            IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+            IgniteSemaphore sem = IgnitionEx.localIgnite().context().dataStructures().semaphore(
+                t.get2(),
+                0,
+                false,
+                false);
+
+            if (sem == null)
+                throw new IllegalStateException("Semaphore was not found on deserialization: " + t.get2());
+
+            return sem;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+        }
+        finally {
+            stash.remove();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void close() {
         if (!rmvd) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 034c314..9ae74a7 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1037,6 +1037,8 @@ org.apache.ignite.internal.processors.datastructures.GridCacheQueueItemKey
 org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy
 org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreImpl
 org.apache.ignite.internal.processors.datastructures.GridCacheSemaphoreState
+org.apache.ignite.internal.processors.datastructures.GridCacheLockImpl
+org.apache.ignite.internal.processors.datastructures.GridCacheLockState
 org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader
 org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey
 org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$CollocatedItemKey

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 4653ce9..4622fff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -774,4 +775,61 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertFalse(srvSemaphore.tryAcquire());
         assertFalse(srvSemaphore.tryAcquire());
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockReconnect() throws Exception {
+        testReentrantLockReconnect(false);
+
+        testReentrantLockReconnect(true);
+    }
+
+    private void testReentrantLockReconnect(final boolean fair) throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true);
+
+        assertEquals(false, clientLock.isLocked());
+
+        final IgniteLock srvLock = srv.reentrantLock("lock1", true, fair, true);
+
+        assertEquals(false, srvLock.isLocked());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvLock.lock();
+            }
+        });
+
+        assertTrue(srvLock.isLocked());
+        assertTrue(clientLock.isLocked());
+
+        assertEquals(1, srvLock.getHoldCount());
+
+        srvLock.lock();
+
+        assertTrue(srvLock.isLocked());
+        assertTrue(clientLock.isLocked());
+
+        assertEquals(2, srvLock.getHoldCount());
+
+        srvLock.unlock();
+
+        assertTrue(srvLock.isLocked());
+        assertTrue(clientLock.isLocked());
+
+        assertEquals(1, srvLock.getHoldCount());
+
+        srvLock.unlock();
+
+        assertFalse(srvLock.isLocked());
+        assertFalse(clientLock.isLocked());
+
+        assertEquals(0, srvLock.getHoldCount());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
index 74023e9..6ba65ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java
@@ -33,9 +33,11 @@ import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicReference;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.cache.CacheMode;
@@ -75,10 +77,10 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     private static final String TRANSACTIONAL_CACHE_NAME = "tx_cache";
 
     /** */
-    private static final int TOP_CHANGE_CNT = 5;
+    private static final int TOP_CHANGE_CNT = 2;
 
     /** */
-    private static final int TOP_CHANGE_THREAD_CNT = 3;
+    private static final int TOP_CHANGE_THREAD_CNT = 2;
 
     /** */
     private boolean client;
@@ -586,6 +588,208 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig
     /**
      * @throws Exception If failed.
      */
+    public void testReentrantLockFailsWhenServersLeft() throws Exception {
+        testReentrantLockFailsWhenServersLeft(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFairReentrantLockFailsWhenServersLeft() throws Exception {
+        testReentrantLockFailsWhenServersLeft(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockFailsWhenServersLeft(final boolean fair) throws Exception {
+        client = true;
+
+        Ignite client = startGrid(gridCount());
+
+        Ignite server = grid(0);
+
+        // Initialize lock.
+        IgniteLock srvLock = server.reentrantLock("lock", true, fair, true);
+
+        IgniteSemaphore semaphore = server.semaphore("sync", 0, true, true);
+
+        IgniteCompute compute = client.compute().withAsync();
+
+        compute.apply(new IgniteClosure<Ignite, Object>() {
+            @Override public Object apply(Ignite ignite) {
+                final IgniteLock l = ignite.reentrantLock("lock", true, fair, true);
+
+                l.lock();
+
+                assertTrue(l.isHeldByCurrentThread());
+
+                l.unlock();
+
+                assertFalse(l.isHeldByCurrentThread());
+
+                // Signal the server to go down.
+                ignite.semaphore("sync", 0, true, true).release();
+
+                boolean isExceptionThrown = false;
+
+                try {
+                    // Wait for the server to go down.
+                    Thread.sleep(1000);
+
+                    l.lock();
+
+                    fail("Exception must be thrown.");
+                }
+                catch (InterruptedException e) {
+                    fail("Interrupted exception not expected here.");
+                }
+                catch (IgniteException e) {
+                    isExceptionThrown = true;
+                }
+                finally {
+                    assertTrue(isExceptionThrown);
+
+                    assertFalse(l.isHeldByCurrentThread());
+                }
+                return null;
+            }
+        }, client);
+
+        // Wait for the lock on client to be acquired then released.
+        semaphore.acquire();
+
+        for (int i = 0; i < gridCount(); i++)
+            stopGrid(i);
+
+        compute.future().get();
+
+        client.close();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockConstantTopologyChangeFailoverSafe() throws Exception {
+        doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+        doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception {
+        doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception {
+        doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFairReentrantLockConstantTopologyChangeFailoverSafe() throws Exception {
+        doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFairReentrantLockConstantMultipleTopologyChangeFailoverSafe() throws Exception {
+        doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFairReentrantLockConstantTopologyChangeNonFailoverSafe() throws Exception {
+        doTestReentrantLock(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFairReentrantLockConstantMultipleTopologyChangeNonFailoverSafe() throws Exception {
+        doTestReentrantLock(multipleTopologyChangeWorker(TOP_CHANGE_THREAD_CNT), false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestReentrantLock(ConstantTopologyChangeWorker topWorker, final boolean failoverSafe, final boolean fair) throws Exception {
+        try (IgniteLock lock = grid(0).reentrantLock(STRUCTURE_NAME, failoverSafe, fair, true)) {
+            IgniteInternalFuture<?> fut = topWorker.startChangingTopology(new IgniteClosure<Ignite, Object>() {
+                @Override public Object apply(Ignite ignite) {
+                    final IgniteLock l = ignite.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false);
+
+                    final AtomicBoolean done = new AtomicBoolean(false);
+
+                    IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            try{
+                                l.lock();
+                            }
+                            finally {
+                                done.set(true);
+                            }
+
+                            return null;
+                        }
+                    });
+
+                    // Wait until l.lock() has been called.
+                    while(!l.hasQueuedThreads() && !done.get()){
+                        // No-op.
+                    }
+
+                    return null;
+                }
+            });
+
+            while (!fut.isDone()) {
+                while (true) {
+                    try {
+                        lock.lock();
+                    }
+                    catch (IgniteException e) {
+                        // Exception may happen in non-failoversafe mode.
+                        if (failoverSafe)
+                            throw e;
+                    }
+                    finally {
+                        // Broken lock cannot be used in non-failoversafe mode.
+                        if(!lock.isBroken() || failoverSafe) {
+                            assertTrue(lock.isHeldByCurrentThread());
+
+                            lock.unlock();
+
+                            assertFalse(lock.isHeldByCurrentThread());
+                        }
+                        break;
+                    }
+                }
+            }
+
+            fut.get();
+
+            for (Ignite g : G.allGrids())
+                assertFalse(g.reentrantLock(STRUCTURE_NAME, failoverSafe, fair, false).isHeldByCurrentThread());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCountDownLatchConstantTopologyChange() throws Exception {
         doTestCountDownLatch(new ConstantTopologyChangeWorker(TOP_CHANGE_THREAD_CNT));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index 34e7080..5929d42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.processors.cache.datastructures;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicLong;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
@@ -327,6 +329,74 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
     /**
      * @throws Exception If failed.
      */
+    public void testReentrantLock() throws Exception {
+        Ignite clientNode = clientIgnite();
+        Ignite srvNode = serverNode();
+
+        testReentrantLock(clientNode, srvNode);
+        testReentrantLock(srvNode, clientNode);
+    }
+
+    /**
+     * @param creator Creator node.
+     * @param other Other node.
+     * @throws Exception If failed.
+     */
+    private void testReentrantLock(Ignite creator, final Ignite other) throws Exception {
+        assertNull(creator.reentrantLock("lock1", true, false, false));
+        assertNull(other.reentrantLock("lock1", true, false, false));
+
+        try (IgniteLock lock = creator.reentrantLock("lock1", true, false, true)) {
+            assertNotNull(lock);
+
+            assertFalse(lock.isLocked());
+
+            final Semaphore semaphore = new Semaphore(0);
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    IgniteLock lock0 = other.reentrantLock("lock1", true, false, false);
+
+                    lock0.lock();
+
+                    assertTrue(lock0.isLocked());
+
+                    semaphore.release();
+
+                    U.sleep(1000);
+
+                    log.info("Release reentrant lock.");
+
+                    lock0.unlock();
+
+                    return null;
+                }
+            });
+
+            semaphore.acquire();
+
+            log.info("Try acquire lock.");
+
+            assertTrue(lock.tryLock(5000, TimeUnit.MILLISECONDS));
+
+            log.info("Finished wait.");
+
+            fut.get();
+
+            assertTrue(lock.isLocked());
+
+            lock.unlock();
+
+            assertFalse(lock.isLocked());
+        }
+
+        assertNull(creator.reentrantLock("lock1", true, false, false));
+        assertNull(other.reentrantLock("lock1", true, false, false));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testQueue() throws Exception {
         Ignite clientNode = clientIgnite();
         Ignite srvNode = serverNode();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
index e88c97b..a8e0095 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -241,7 +242,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
     private void testUniqueName(final boolean singleGrid) throws Exception {
         final String name = IgniteUuid.randomUuid().toString();
 
-        final int DS_TYPES = 8;
+        final int DS_TYPES = 9;
 
         final int THREADS = DS_TYPES * 3;
 
@@ -322,6 +323,13 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
                                     res = ignite.semaphore(name, 0, false, true);
 
                                     break;
+
+                                case 8:
+                                    log.info("Create atomic reentrant lock, grid: " + ignite.name());
+
+                                    res = ignite.reentrantLock(name, true, true, true);
+
+                                    break;
                                 default:
                                     fail();
 
@@ -361,7 +369,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT
                         res instanceof IgniteCountDownLatch ||
                         res instanceof IgniteQueue ||
                         res instanceof IgniteSet ||
-                        res instanceof IgniteSemaphore);
+                        res instanceof IgniteSemaphore ||
+                        res instanceof IgniteLock);
 
                 log.info("Data structure created: " + dataStructure);
 


Mime
View raw message