ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/22] ignite git commit: IGNITE-642 Implement IgniteReentrantLock data structure
Date Thu, 28 Apr 2016 11:29:18 GMT
IGNITE-642 Implement IgniteReentrantLock data structure


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9bd9a33
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9bd9a33
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9bd9a33

Branch: refs/heads/ignite-2523-1-resp
Commit: f9bd9a3383c1a914970a1d0a391b345cc584430d
Parents: 2226377
Author: Vladisav Jelisavcic <vladisavj@gmail.com>
Authored: Wed Apr 27 18:31:26 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Wed Apr 27 18:31:26 2016 +0300

----------------------------------------------------------------------
 .../datastructures/IgniteLockExample.java       |  293 ++++
 .../ignite/examples/CacheExamplesSelfTest.java  |    8 +
 .../src/main/java/org/apache/ignite/Ignite.java |   17 +
 .../java/org/apache/ignite/IgniteCondition.java |  338 ++++
 .../main/java/org/apache/ignite/IgniteLock.java |  489 ++++++
 .../ignite/internal/GridKernalContextImpl.java  |    4 +-
 .../apache/ignite/internal/IgniteKernal.java    |   21 +
 .../datastructures/DataStructuresProcessor.java |  158 +-
 .../datastructures/GridCacheLockEx.java         |   52 +
 .../datastructures/GridCacheLockImpl.java       | 1538 +++++++++++++++++
 .../datastructures/GridCacheLockState.java      |  353 ++++
 .../datastructures/GridCacheSemaphoreImpl.java  |   33 +
 .../resources/META-INF/classnames.properties    |    2 +
 .../IgniteClientReconnectAtomicsTest.java       |   58 +
 ...eAbstractDataStructuresFailoverSelfTest.java |  208 ++-
 .../IgniteClientDataStructuresAbstractTest.java |   70 +
 .../IgniteDataStructureUniqueNameTest.java      |   13 +-
 .../IgniteLockAbstractSelfTest.java             | 1629 ++++++++++++++++++
 .../IgniteSemaphoreAbstractSelfTest.java        |   31 +
 .../local/IgniteLocalLockSelfTest.java          |  110 ++
 .../IgnitePartitionedLockSelfTest.java          |   33 +
 .../IgniteReplicatedLockSelfTest.java           |   33 +
 .../distributed/GridCacheLockAbstractTest.java  |    2 +-
 .../cache/GridCacheDataStructuresLoadTest.java  |   53 +
 .../ignite/testframework/junits/IgniteMock.java |   10 +
 .../junits/multijvm/IgniteProcessProxy.java     |    7 +
 .../IgniteCacheDataStructuresSelfTestSuite.java |    6 +
 .../org/apache/ignite/IgniteSpringBean.java     |   11 +
 28 files changed, 5570 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
new file mode 100644
index 0000000..1f84787
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteLockExample.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/**
+ * This example demonstrates cache based reentrant lock.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
+ */
+public class IgniteLockExample {
+    /** Number of items for each producer/consumer to produce/consume. */
+    private static final int OPS_COUNT = 100;
+
+    /** Number of producers. */
+    private static final int NUM_PRODUCERS = 5;
+
+    /** Number of consumers. */
+    private static final int NUM_CONSUMERS = 5;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Name of the global resource. */
+    private static final String QUEUE_ID = "queue";
+
+    /** Name of the synchronization variable. */
+    private static final String SYNC_NAME = "done";
+
+    /** Name of the condition object. */
+    private static final String NOT_FULL = "notFull";
+
+    /** Name of the condition object. */
+    private static final String NOT_EMPTY = "notEmpty";
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     */
+    public static void main(String[] args) {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache atomic reentrant lock example started.");
+
+            // Make name of reentrant lock.
+            final String reentrantLockName = UUID.randomUUID().toString();
+
+            // Initialize lock.
+            IgniteLock lock = ignite.reentrantLock(reentrantLockName, true, false, true);
+
+            // Init distributed cache.
+            IgniteCache<String, Integer> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+            // Init shared variable.
+            cache.put(QUEUE_ID, 0);
+
+            // Shared variable indicating number of jobs left to be completed.
+            cache.put(SYNC_NAME, NUM_PRODUCERS + NUM_CONSUMERS);
+
+            // Start consumers on all cluster nodes.
+            for (int i = 0; i < NUM_CONSUMERS; i++)
+                ignite.compute().withAsync().run(new Consumer(reentrantLockName));
+
+            // Start producers on all cluster nodes.
+            for (int i = 0; i < NUM_PRODUCERS; i++)
+                ignite.compute().withAsync().run(new Producer(reentrantLockName));
+
+            System.out.println("Master node is waiting for all other nodes to finish...");
+
+            // Wait for everyone to finish.
+            try {
+                lock.lock();
+
+                IgniteCondition notDone = lock.getOrCreateCondition(SYNC_NAME);
+
+                int count = cache.get(SYNC_NAME);
+
+                while(count > 0) {
+                    notDone.await();
+
+                    count = cache.get(SYNC_NAME);
+                }
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        System.out.flush();
+        System.out.println();
+        System.out.println("Finished reentrant lock example...");
+        System.out.println("Check all nodes for output (this node is also part of the cluster).");
+    }
+
+    /**
+     * Closure which simply acquires reentrant lock.
+     */
+    private abstract static class ReentrantLockExampleClosure implements IgniteRunnable {
+        /** Semaphore name. */
+        protected final String reentrantLockName;
+
+        /**
+         * @param reentrantLockName Reentrant lock name.
+         */
+        ReentrantLockExampleClosure(String reentrantLockName) {
+            this.reentrantLockName = reentrantLockName;
+        }
+    }
+
+    /**
+     * Closure which simulates producer.
+     */
+    private static class Producer extends ReentrantLockExampleClosure {
+        /**
+         * @param reentrantLockName Reentrant lock name.
+         */
+        public Producer(String reentrantLockName) {
+            super(reentrantLockName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            System.out.println("Producer started. ");
+
+            IgniteLock lock = Ignition.ignite().reentrantLock(reentrantLockName, true, false, true);
+
+            // Condition to wait on when queue is full.
+            IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL);
+
+            // Signaled to wait on when queue is empty.
+            IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY);
+
+            // Signaled when job is done.
+            IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME);
+
+            IgniteCache<String, Integer> cache = Ignition.ignite().cache(CACHE_NAME);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                try {
+                    lock.lock();
+
+                    int val = cache.get(QUEUE_ID);
+
+                    while(val >= 100){
+                        System.out.println("Queue is full. Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                            " paused.");
+
+                        notFull.await();
+
+                        val = cache.get(QUEUE_ID);
+                    }
+
+                    val++;
+
+                    System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                        ", available=" + val + ']');
+
+                    cache.put(QUEUE_ID, val);
+
+                    notEmpty.signalAll();
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+
+            System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+            try {
+                lock.lock();
+
+                int count = cache.get(SYNC_NAME);
+
+                count--;
+
+                cache.put(SYNC_NAME, count);
+
+                // Signals the master thread.
+                done.signal();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Closure which simulates consumer.
+     */
+    private static class Consumer extends ReentrantLockExampleClosure {
+        /**
+         * @param reentrantLockName ReentrantLock name.
+         */
+        public Consumer(String reentrantLockName) {
+            super(reentrantLockName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            System.out.println("Consumer started. ");
+
+            Ignite g = Ignition.ignite();
+
+            IgniteLock lock = g.reentrantLock(reentrantLockName, true, false, true);
+
+            // Condition to wait on when queue is full.
+            IgniteCondition notFull = lock.getOrCreateCondition(NOT_FULL);
+
+            // Signaled to wait on when queue is empty.
+            IgniteCondition notEmpty = lock.getOrCreateCondition(NOT_EMPTY);
+
+            // Signaled when job is done.
+            IgniteCondition done = lock.getOrCreateCondition(SYNC_NAME);
+
+            IgniteCache<String, Integer> cache = g.cache(CACHE_NAME);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                try {
+                    lock.lock();
+
+                    int val = cache.get(QUEUE_ID);
+
+                    while (val <= 0) {
+                        System.out.println("Queue is empty. Consumer [nodeId=" +
+                            Ignition.ignite().cluster().localNode().id() + " paused.");
+
+                        notEmpty.await();
+
+                        val = cache.get(QUEUE_ID);
+                    }
+
+                    val--;
+
+                    System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                        ", available=" + val + ']');
+
+                    cache.put(QUEUE_ID, val);
+
+                    notFull.signalAll();
+                }
+                finally {
+                    lock.unlock();
+                }
+            }
+
+            System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+            try {
+                lock.lock();
+
+                int count = cache.get(SYNC_NAME);
+
+                count--;
+
+                cache.put(SYNC_NAME, count);
+
+                // Signals the master thread.
+                done.signal();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 541291b..43b05b5 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample;
 import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample;
 import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample;
 import org.apache.ignite.examples.datastructures.IgniteQueueExample;
+import org.apache.ignite.examples.datastructures.IgniteLockExample;
 import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample;
 import org.apache.ignite.examples.datastructures.IgniteSetExample;
 import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
@@ -100,6 +101,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCacheLockExample() throws Exception {
+        IgniteLockExample.main(EMPTY_ARGS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCacheQueueExample() throws Exception {
         IgniteQueueExample.main(EMPTY_ARGS);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 5703744..b62672e 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -466,6 +466,23 @@ public interface Ignite extends AutoCloseable {
         throws IgniteException;
 
     /**
+     * Gets or creates reentrant lock. If reentrant lock is not found in cache and {@code create} flag
+     * is {@code true}, it is created using provided name.
+     *
+     * @param name Name of the lock.
+     * @param failoverSafe {@code True} to create failover safe lock which means that
+     *      if any node leaves topology, all locks already acquired by that node are silently released
+     *      and become available for other nodes to acquire. If flag is {@code false} then
+     *      all threads on other nodes waiting to acquire lock are interrupted.
+     * @param fair If {@code True}, fair lock will be created.
+     * @param create Boolean flag indicating whether data structure should be created if does not exist.
+     * @return ReentrantLock for the given name.
+     * @throws IgniteException If reentrant lock could not be fetched or created.
+     */
+    public IgniteLock reentrantLock(String name, boolean failoverSafe, boolean fair, boolean create)
+        throws IgniteException;
+
+    /**
      * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
      * {@code null}.
      * If queue is present already, queue properties will not be changed. Use

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
new file mode 100644
index 0000000..020f23a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCondition.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * This interface provides a rich API for working with condition objects
+ * associated with distributed reentrant locks.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * IgniteCondition provides functionality similar to {@code java.util.concurrent.locks.Condition}.
+ */
+public interface IgniteCondition extends Condition {
+    /**
+     * Name of ignite condition.
+     *
+     * @return Name of ignite condition.
+     */
+    public String name();
+
+    /**
+     * Causes the current thread to wait until it is signalled or
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>The lock associated with this {@code IgniteCondition} is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of six things happens:
+     * <ul>
+     * <li>Some other thread (on any node) invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread (on any node) invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or
+     * <li>Local node is stopped; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>If lock is not broken (because of failure of lock owner node)
+     * in non-failoverSafe mode and local node is alive,
+     * before this method can return the current thread must
+     * re-acquire the lock associated with this condition. In all other cases
+     * when the thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IgniteException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void await() throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Causes the current thread to wait until it is signalled.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of five things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or
+     * <li>Local node is stopped; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>If lock is not broken (because of failure of lock owner node)
+     * in non-failoverSafe mode and local node is alive,
+     * before this method can return the current thread must
+     * re-acquire the lock associated with this condition. In all other cases,
+     * when the thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread's interrupted status is set when it enters
+     * this method, or it is {@linkplain Thread#interrupt interrupted}
+     * while waiting, it will continue to wait until signalled. When it finally
+     * returns from this method its interrupted status will still
+     * be set.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void awaitUninterruptibly() throws IgniteException;
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified waiting time elapses.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of seven things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>The specified waiting time elapses; or
+     * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or
+     * <li>Local node is stopped; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>If lock is not broken (because of failure of lock owner node)
+     * in non-failoverSafe mode and local node is alive,
+     * before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     * <p>The method returns an estimate of the number of nanoseconds
+     * remaining to wait given the supplied {@code nanosTimeout}
+     * value upon return, or a value less than or equal to zero if it
+     * timed out. This value can be used to determine whether and how
+     * long to re-wait in cases where the wait returns but an awaited
+     * condition still does not hold. Typical uses of this method take
+     * the following form:
+     *
+     *  <pre> {@code
+     * boolean aMethod(long timeout, TimeUnit unit) {
+     *   long nanos = unit.toNanos(timeout);
+     *   lock.lock();
+     *   try {
+     *     while (!conditionBeingWaitedFor()) {
+     *       if (nanos <= 0L)
+     *         return false;
+     *       nanos = theCondition.awaitNanos(nanos);
+     *     }
+     *     // ...
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * }}</pre>
+     *
+     * <p>Design note: This method requires a nanosecond argument so
+     * as to avoid truncation errors in reporting remaining times.
+     * Such precision loss would make it difficult for programmers to
+     * ensure that total waiting times are not systematically shorter
+     * than specified when re-waits occur.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @param nanosTimeout the maximum time to wait, in nanoseconds
+     * @return an estimate of the {@code nanosTimeout} value minus
+     *         the time spent waiting upon return from this method.
+     *         A positive value may be used as the argument to a
+     *         subsequent call to this method to finish waiting out
+     *         the desired time.  A value less than or equal to zero
+     *         indicates that no time remains.
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IgniteException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override long awaitNanos(long nanosTimeout) throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified waiting time elapses. This method is behaviorally
+     * equivalent to:
+     *  <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre>
+     *
+     * @param time the maximum time to wait
+     * @param unit the time unit of the {@code time} argument
+     * @return {@code false} if the waiting time detectably elapsed
+     *         before return from the method, else {@code true}
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IgniteException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override boolean await(long time, TimeUnit unit) throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Causes the current thread to wait until it is signalled or interrupted,
+     * or the specified deadline elapses.
+     *
+     * <p>The lock associated with this condition is atomically
+     * released and the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until <em>one</em> of seven things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #signal} method for this
+     * {@code Condition} and the current thread happens to be chosen as the
+     * thread to be awakened; or
+     * <li>Some other thread invokes the {@link #signalAll} method for this
+     * {@code Condition}; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread, and interruption of thread suspension is supported; or
+     * <li>Some other node in grid fails, and lock is created in non-failoverSafe mode; or
+     * <li>Local node is stopped; or
+     * <li>The specified deadline elapses; or
+     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
+     * </ul>
+     *
+     * <p>If lock is not broken (because of failure of lock owner node)
+     * in non-failoverSafe mode and local node is alive,
+     * before this method can return the current thread must
+     * re-acquire the lock associated with this condition. When the
+     * thread returns it is <em>guaranteed</em> to hold this lock.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * and interruption of thread suspension is supported,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared. It is not specified, in the first
+     * case, whether or not the test for interruption occurs before the lock
+     * is released.
+     *
+     * <p>The return value indicates whether the deadline has elapsed,
+     * which can be used as follows:
+     *  <pre> {@code
+     * boolean aMethod(Date deadline) {
+     *   boolean stillWaiting = true;
+     *   lock.lock();
+     *   try {
+     *     while (!conditionBeingWaitedFor()) {
+     *       if (!stillWaiting)
+     *         return false;
+     *       stillWaiting = theCondition.awaitUntil(deadline);
+     *     }
+     *     // ...
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * }}</pre>
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @param deadline the absolute time to wait until
+     * @return {@code false} if the deadline has elapsed upon return, else
+     *         {@code true}
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     *         (and interruption of thread suspension is supported)
+     * @throws IgniteException if the node stopped, or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override boolean awaitUntil(Date deadline) throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Wakes up one waiting thread.
+     *
+     * <p>If any threads are waiting on this condition then one
+     * is selected for waking up. That thread must then re-acquire the
+     * lock before returning from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteException if node is stopped or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void signal() throws IgniteException;
+
+    /**
+     * Wakes up all waiting threads.
+     *
+     * <p>If any threads are waiting on this condition then they are
+     * all woken up. Each thread must re-acquire the lock before it can
+     * return from {@code await}.
+     *
+     * <p><b>Implementation Considerations</b>
+     *
+     * <p>The current thread is assumed to hold the lock associated with this
+     * {@code Condition} when this method is called. If not, an {@link IllegalMonitorStateException}
+     * will be thrown.
+     *
+     * @throws IgniteException if node is stopped or
+     *         node owning the lock failed in non-failoversafe mode
+     */
+    @Override void signalAll() throws IgniteException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteLock.java b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
new file mode 100644
index 0000000..2b3ad3d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteLock.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * This interface provides a rich API for working with distributed reentrant locks.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed reentrant lock provides functionality similar to {@code java.util.concurrent.ReentrantLock}.
+ * <h1 class="header">Creating Distributed ReentrantLock</h1>
+ * Instance of cache reentrant lock can be created by calling the following method:
+ * {@link Ignite#reentrantLock(String, boolean, boolean, boolean)}.
+ * <h1 class="header">Protection from failover</h1>
+ * Ignite lock can automatically recover from node failure.
+ * <ul>
+ * <li>If failoverSafe flag is set to true upon creation,
+ * in case a node owning the lock fails, lock will be automatically released and become available for threads on other
+ * nodes to acquire. No exception will be thrown.
+ * <li>If failoverSafe flag is set to false upon creation,
+ * in case a node owning the lock fails, {@code IgniteException} will be thrown on every other node attempting to
+ * perform any operation on this lock. No automatic recovery will be attempted,
+ * and lock will be marked as broken (i.e. unusable), which can be checked using the method #isBroken().
+ * Broken lock cannot be reused again.
+ * </ul>
+ *
+ * <h1 class="header">Implementation issues</h1>
+ * Ignite lock comes in two flavours: fair and non-fair. Non-fair lock assumes no ordering should be imposed
+ * on acquiring threads; in case of contention, threads from all nodes compete for the lock once the lock is released.
+ * In most cases this is the desired behaviour. However, in some cases, using the non-fair lock can lead to uneven load
+ * distribution among nodes.
+ * Fair lock solves this issue by imposing strict FIFO ordering policy at a cost of an additional transaction.
+ * This ordering does not guarantee fairness of thread scheduling (similar to {@code java.util.concurrent.ReentrantLock}).
+ * Thus, one of many threads on any node using a fair lock may obtain it multiple times in succession while other
+ * active threads are not progressing and not currently holding the lock. Also note that the untimed tryLock method
+ * does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting.
+ *
+ * </p>
+ * As a rule of thumb, whenever there is a reasonable time window between successive calls to release and acquire
+ * the lock, non-fair lock should be preferred:
+ *
+ * <pre> {@code
+ *      while(someCondition){
+ *          // do anything
+ *          lock.lock();
+ *          try{
+ *              // ...
+ *          }
+ *          finally {
+ *              lock.unlock();
+ *          }
+ *      }
+ * }</pre>
+ *
+ * If successive calls to release/acquire are following immediately,
+ * e.g.
+ *
+ * <pre> {@code
+ *      while(someCondition){
+ *          lock.lock();
+ *          try {
+ *              // do something
+ *          }
+ *          finally {
+ *              lock.unlock();
+ *          }
+ *      }
+ * }</pre>
+ *
+ * using the fair lock is reasonable in order to allow even distribution of load among nodes
+ * (although overall throughput may be lower due to increased overhead).
+ *
+ */
+public interface IgniteLock extends Lock, Closeable {
+    /**
+     * Name of atomic reentrant lock.
+     *
+     * @return Name of atomic reentrant lock.
+     */
+    public String name();
+
+    /**
+     * Acquires the distributed reentrant lock.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately, setting the lock hold count to one.
+     *
+     * <p>If the current thread already holds this lock then the hold count
+     * is incremented by one and the method returns immediately.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of four things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Lock is broken (any node failed while owning this lock), and lock is created in
+     * non-failoverSafe mode.
+     *
+     * <li>Local node is stopped.
+     *
+     * @throws IgniteException if the node is stopped or broken in non-failoverSafe mode
+     */
+    void lock() throws IgniteException;
+
+    /**
+     * Acquires the lock unless the current thread is
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately, setting the lock hold count to one.
+     *
+     * <p>If the current thread already holds this lock then the hold count
+     * is incremented by one and the method returns immediately.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of four things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread.
+     *
+     * <li>Lock is broken (any node failed while owning this lock), and lock is created in
+     * non-failoverSafe mode.
+     *
+     * <li>Local node is stopped.
+     *
+     * </ul>
+     *
+     * <p>If the lock is acquired by the current thread then the lock hold
+     * count is set to one.
+     *
+     * <p>If the current thread:
+     *
+     * <ul>
+     *
+     * <li>has its interrupted status set on entry to this method; or
+     *
+     * <li>is {@linkplain Thread#interrupt interrupted} while acquiring
+     * the lock; or
+     *
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * </ul>
+     *
+     * <p>{@link IgniteException} is thrown in case:
+     *
+     * <ul>
+     *
+     * <li>the lock is broken before or during the attempt to acquire this lock; or
+     *
+     * <li>local node is stopped,
+     *
+     * </ul>
+     *
+     * <p>In this implementation, as this method is an explicit
+     * interruption point, preference is given to responding to the
+     * interrupt over normal or reentrant acquisition of the lock.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IgniteException if the lock is broken in non-failoverSafe mode (any node failed while owning this lock),
+     *         or local node is stopped
+     */
+    @Override public void lockInterruptibly() throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Acquires the lock only if it is free at the time of invocation.
+     *
+     * <p>Acquires the lock if it is available and returns immediately
+     * with the value {@code true}.
+     * If the lock is not available then this method will return
+     * immediately with the value {@code false}.
+     *
+     * <p>A typical usage idiom for this method would be:
+     *  <pre> {@code
+     * Lock lock = ...;
+     * if (lock.tryLock()) {
+     *   try {
+     *     // manipulate protected state
+     *   } finally {
+     *     lock.unlock();
+     *   }
+     * } else {
+     *   // perform alternative actions
+     * }}</pre>
+     *
+     * This usage ensures that the lock is unlocked if it was acquired, and
+     * doesn't try to unlock if the lock was not acquired.
+     *
+     * If node is stopped, or any node failed while owning the lock in non-failoverSafe mode,
+     * then {@link IgniteException} is thrown.
+     *
+     * @return {@code true} if the lock was acquired and
+     *         {@code false} otherwise
+     *
+     * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode
+     */
+    @Override public boolean tryLock() throws IgniteException;
+
+    /**
+     * Acquires the lock if it is not held by another thread within the given
+     * waiting time and the current thread has not been
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the lock if it is not held by another thread and returns
+     * immediately with the value {@code true}, setting the lock hold count
+     * to one.
+     *
+     * <p>If the current thread
+     * already holds this lock then the hold count is incremented by one and
+     * the method returns {@code true}.
+     *
+     * <p>If the lock is held by another thread then the
+     * current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of five things happens:
+     *
+     * <ul>
+     *
+     * <li>The lock is acquired by the current thread; or
+     *
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     *
+     * <li>Lock is broken (any node failed while owning this lock), and lock is created in
+     * non-failoverSafe mode.
+     *
+     * <li>Local node is stopped.
+     *
+     * <li>The specified waiting time elapses
+     *
+     * </ul>
+     *
+     * <p>If the lock is acquired then the value {@code true} is returned and
+     * the lock hold count is set to one.
+     *
+     * <p>If the current thread:
+     *
+     * <ul>
+     *
+     * <li>has its interrupted status set on entry to this method; or
+     *
+     * <li>is {@linkplain Thread#interrupt interrupted} while
+     * acquiring the lock; or
+     *
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>{@link IgniteException} is thrown in case:
+     *
+     * <ul>
+     *
+     * <li>the lock is broken before or during the attempt to acquire this lock; or
+     *
+     * <li>local node is stopped,
+     *
+     * </ul>
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * <p>In this implementation, as this method is an explicit
+     * interruption point, preference is given to responding to the
+     * interrupt over normal or reentrant acquisition of the lock, and
+     * over reporting the elapse of the waiting time.
+     *
+     * @param timeout the time to wait for the lock
+     * @param unit the time unit of the timeout argument
+     * @return {@code true} if the lock was free and was acquired by the
+     *         current thread, or the lock was already held by the current
+     *         thread; and {@code false} if the waiting time elapsed before
+     *         the lock could be acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode
+     * @throws NullPointerException if the time unit is null
+     */
+    @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException, IgniteException;
+
+    /**
+     * Releases the lock.
+     *
+     * If lock is not owned by current thread, then an {@link
+     * IllegalMonitorStateException} is thrown.
+     * If lock is already broken prior to invocation of this method, and
+     * lock is created in non-failover safe mode, then {@link IgniteException} is thrown.
+     *
+     * @throws IllegalMonitorStateException if not owned by current thread
+     * @throws IgniteException if node is stopped, or lock is already broken in non-failover safe mode
+     */
+    void unlock() throws IgniteInterruptedException;
+
+    /**
+     * Returns a {@link Condition} instance for use with this
+     * {@link IgniteLock} instance.
+     *
+     * <ul>
+     *
+     * <li>If this lock is not held when any of the {@link Condition}
+     * {@linkplain Condition#await() waiting} or {@linkplain
+     * Condition#signal signalling} methods are called, then an {@link
+     * IllegalMonitorStateException} is thrown.
+     *
+     * <li>When the condition {@linkplain Condition#await() waiting}
+     * methods are called the lock is released and, before they
+     * return, the lock is reacquired and the lock hold count restored
+     * to what it was when the method was called.
+     *
+     * <li>If a thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting then the wait will terminate, an {@link
+     * IgniteInterruptedException} will be thrown, and the thread's
+     * interrupted status will be cleared.
+     *
+     * <li> Waiting threads are signalled in FIFO order.
+     *
+     * </ul>
+     *
+     * @param name Name of the distributed condition object
+     *
+     * @return the Condition object
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public IgniteCondition getOrCreateCondition(String name) throws IgniteException;
+
+    /**
+     * This method is not supported in IgniteLock,
+     * Any invocation of this method will result in {@linkplain UnsupportedOperationException}.
+     * Correct way to obtain Condition object is through method {@linkplain IgniteLock#getOrCreateCondition(String)}
+     *
+     */
+    @Override public Condition newCondition();
+
+    /**
+     * Queries the number of holds on this lock by the current thread.
+     *
+     * @return the number of holds on this lock by the current thread,
+     *         or zero if this lock is not held by the current thread
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public int getHoldCount() throws IgniteException;
+
+    /**
+     * Queries if this lock is held by the current thread.
+     *
+     * @return {@code true} if current thread holds this lock and
+     *         {@code false} otherwise
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean isHeldByCurrentThread() throws IgniteException;
+
+    /**
+     * Queries if this lock is held by any thread on any node. This method is
+     * designed for use in monitoring of the system state,
+     * not for synchronization control.
+     *
+     * @return {@code true} if any thread on this or any other node holds this lock and
+     *         {@code false} otherwise
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean isLocked() throws IgniteException;
+
+    /**
+     * Queries whether any threads on this node are waiting to acquire this lock. Note that
+     * because cancellations may occur at any time, a {@code true}
+     * return does not guarantee that any other thread will ever
+     * acquire this lock.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads on this node waiting to
+     *         acquire the lock
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean hasQueuedThreads() throws IgniteException;
+
+    /**
+     * Queries whether the given thread is waiting to acquire this
+     * lock. Note that because cancellations may occur at any time, a
+     * {@code true} return does not guarantee that this thread
+     * will ever acquire this lock.  This method is designed primarily for use
+     * in monitoring of the system state.
+     *
+     * @param thread the thread
+     * @return {@code true} if the given thread is queued waiting for this lock
+     * @throws NullPointerException if the thread is null
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean hasQueuedThread(Thread thread) throws IgniteException;
+
+    /**
+     * Queries whether any threads on this node are waiting on the given condition
+     * associated with this lock. Note that because timeouts and
+     * interrupts may occur at any time, a {@code true} return does
+     * not guarantee that a future {@code signal} will awaken any
+     * threads.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @param condition the condition
+     * @return {@code true} if there are any waiting threads on this node
+     * @throws IllegalMonitorStateException if this lock is not held
+     * @throws IllegalArgumentException if the given condition is
+     *         not associated with this lock
+     * @throws NullPointerException if the condition is null
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean hasWaiters(IgniteCondition condition) throws IgniteException;
+
+    /**
+     * Returns an estimate of the number of threads on this node that are waiting on the
+     * given condition associated with this lock. Note that because
+     * timeouts and interrupts may occur at any time, the estimate
+     * serves only as an upper bound on the actual number of waiters.
+     * This method is designed for use in monitoring of the system
+     * state, not for synchronization control.
+     *
+     * @param condition the condition
+     * @return the estimated number of waiting threads on this node
+     * @throws IllegalMonitorStateException if this lock is not held
+     * @throws IllegalArgumentException if the given condition is
+     *         not associated with this lock
+     * @throws NullPointerException if the condition is null
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public int getWaitQueueLength(IgniteCondition condition) throws IgniteException;
+
+    /**
+     * Returns {@code true} if this lock is safe to use after node failure.
+     * If not, IgniteInterruptedException is thrown on every other node after node failure.
+     *
+     * @return {@code true} if this reentrant lock has failoverSafe set true
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean isFailoverSafe();
+
+    /**
+     * Returns {@code true} if this lock is fair. Fairness flag can only be set on lock creation.
+     *
+     * @return {@code true} if this reentrant lock has fairness flag set true.
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean isFair();
+
+    /**
+     * Returns true if any node that owned the locked failed before releasing the lock.
+     *
+     * @return true if any node failed while owning the lock since the lock on this node was initialized.
+     * @throws IgniteException if the lock is not initialized or already removed
+     */
+    public boolean isBroken() throws IgniteException;
+
+    /**
+     * Gets status of reentrant lock.
+     *
+     * @return {@code true} if reentrant lock was removed from cache, {@code false} in other case.
+     */
+    public boolean removed();
+
+    /**
+     * Removes reentrant lock.
+     *
+     * @throws IgniteException If operation failed.
+     */
+    @Override public void close();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 79d67df..1ff4543 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -876,7 +876,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        stash.set(U.readString(in));
+        U.readString(in); // Read for compatibility only. See #readResolve().
     }
 
     /**
@@ -887,7 +887,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      */
     protected Object readResolve() throws ObjectStreamException {
         try {
-            return IgnitionEx.gridx(stash.get()).context();
+            return IgnitionEx.localIgnite().context();
         }
         catch (IllegalStateException e) {
             throw U.withCause(new InvalidObjectException(e.getMessage()), e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5094415..0f180b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -64,6 +64,7 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
@@ -3036,6 +3037,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteLock reentrantLock(
+        String name,
+        boolean failoverSafe,
+        boolean fair,
+        boolean create
+    ) {
+        guard();
+
+        try {
+            return ctx.dataStructures().reentrantLock(name, failoverSafe, fair, create);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 10aa71e..1cad22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -43,6 +43,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteSemaphore;
@@ -95,6 +96,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -135,6 +137,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** Cache contains only {@code GridCacheSemaphoreState}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView;
 
+    /** Cache contains only {@code GridCacheLockState}. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> reentrantLockView;
+
     /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
 
@@ -177,7 +182,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         ctx.event().addLocalEventListener(
             new GridLocalEventListener() {
                 @Override public void onEvent(final Event evt) {
-                    // This may require cache operation to exectue,
+                    // This may require cache operation to execute,
                     // therefore cannot use event notification thread.
                     ctx.closure().callLocalSafe(
                         new Callable<Object>() {
@@ -189,6 +194,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 for (GridCacheRemovable ds : dsMap.values()) {
                                     if (ds instanceof GridCacheSemaphoreEx)
                                         ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
+                                    else if (ds instanceof GridCacheLockEx)
+                                        ((GridCacheLockEx)ds).onNodeRemoved(leftNodeId);
                                 }
 
                                 return null;
@@ -224,6 +231,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             semView = atomicsCache;
 
+            reentrantLockView = atomicsCache;
+
             atomicLongView = atomicsCache;
 
             atomicRefView = atomicsCache;
@@ -262,6 +271,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         for (GridCacheRemovable ds : dsMap.values()) {
             if (ds instanceof GridCacheSemaphoreEx)
                 ((GridCacheSemaphoreEx)ds).stop();
+
+            if (ds instanceof GridCacheLockEx)
+                ((GridCacheLockEx)ds).onStop();
         }
 
         if (initLatch.getCount() > 0) {
@@ -1332,6 +1344,124 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Gets or creates reentrant lock. If reentrant lock is not found in cache,
+     * it is created using provided name, failover mode, and fairness mode parameters.
+     *
+     * @param name Name of the reentrant lock.
+     * @param failoverSafe Flag indicating behaviour in case of failure.
+     * @param fair Flag indicating fairness policy of this lock.
+     * @param create If {@code true} reentrant lock will be created in case it is not in cache.
+     * @return ReentrantLock for the given name or {@code null} if it is not found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteLock reentrantLock(final String name, final boolean failoverSafe, final boolean fair, final boolean create)
+        throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        awaitInitialization();
+
+        checkAtomicsConfiguration();
+
+        startQuery();
+
+        return getAtomic(new IgniteOutClosureX<IgniteLock>() {
+            @Override public IgniteLock applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
+
+                    // Check that reentrant lock hasn't been created in other thread yet.
+                    GridCacheLockEx reentrantLock = cast(dsMap.get(key), GridCacheLockEx.class);
+
+                    if (reentrantLock != null) {
+                        assert val != null;
+
+                        return reentrantLock;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheLockState(0, dsCacheCtx.nodeId(), 0, failoverSafe, fair);
+
+                        dsView.put(key, val);
+                    }
+
+                    GridCacheLockEx reentrantLock0 = new GridCacheLockImpl(
+                        name,
+                        key,
+                        reentrantLockView,
+                        dsCacheCtx);
+
+                    dsMap.put(key, reentrantLock0);
+
+                    tx.commit();
+
+                    return reentrantLock0;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create reentrant lock: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, REENTRANT_LOCK, null), create, GridCacheLockEx.class);
+    }
+
+    /**
+     * Removes reentrant lock from cache.
+     *
+     * @param name Name of the reentrant lock.
+     * @param broken Flag indicating the reentrant lock is broken and should be removed unconditionally.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeReentrantLock(final String name, final boolean broken) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        awaitInitialization();
+
+        removeDataStructure(new IgniteOutClosureX<Void>() {
+            @Override public Void applyx() throws IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class);
+
+                    if (val != null) {
+                        if (val.get() > 0 && !broken)
+                            throw new IgniteCheckedException("Failed to remove reentrant lock with blocked threads. ");
+
+                        dsView.remove(key);
+
+                        tx.commit();
+                    }
+                    else
+                        tx.setRollbackOnly();
+
+                    return null;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, REENTRANT_LOCK, null);
+    }
+
+    /**
      * Remove internal entry by key from cache.
      *
      * @param key Internal entry key.
@@ -1379,7 +1509,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
                 return evt.getValue() instanceof GridCacheCountDownLatchValue ||
-                    evt.getValue() instanceof GridCacheSemaphoreState;
+                    evt.getValue() instanceof GridCacheSemaphoreState ||
+                    evt.getValue() instanceof GridCacheLockState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1476,7 +1607,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                     ", actual=" + sem.getClass() + ", value=" + sem + ']');
                         }
                     }
+                    else if (val0 instanceof GridCacheLockState) {
+                        GridCacheInternalKey key = evt.getKey();
+
+                        // Notify reentrant lock on changes.
+                        final GridCacheRemovable reentrantLock = dsMap.get(key);
+
+                        GridCacheLockState val = (GridCacheLockState)val0;
+
+                        if (reentrantLock instanceof GridCacheLockEx) {
+                            final GridCacheLockEx lock0 = (GridCacheLockEx)reentrantLock;
 
+                            lock0.onUpdate(val);
+                        }
+                        else if (reentrantLock != null) {
+                            U.error(log, "Failed to cast object " +
+                                "[expected=" + IgniteLock.class.getSimpleName() +
+                                ", actual=" + reentrantLock.getClass() + ", value=" + reentrantLock + ']');
+                        }
+                    }
                 }
                 else {
                     assert evt.getEventType() == EventType.REMOVED : evt;
@@ -1694,7 +1843,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         SET(IgniteSet.class.getSimpleName()),
 
         /** */
-        SEMAPHORE(IgniteSemaphore.class.getSimpleName());
+        SEMAPHORE(IgniteSemaphore.class.getSimpleName()),
+
+        /** */
+        REENTRANT_LOCK(IgniteLock.class.getSimpleName());
 
         /** */
         private static final DataStructureType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9bd9a33/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
new file mode 100644
index 0000000..0887345
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockEx.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteLock;
+
+/**
+ * Grid cache reentrant lock ({@code 'Ex'} stands for external).
+ */
+public interface GridCacheLockEx extends IgniteLock, GridCacheRemovable {
+    /**
+     * Get current reentrant lock latch key.
+     *
+     * @return Lock key.
+     */
+    public GridCacheInternalKey key();
+
+    /**
+     * Callback to notify reentrant lock on changes.
+     *
+     * @param state New reentrant lock state.
+     */
+    public void onUpdate(GridCacheLockState state);
+
+    /**
+     * Callback to notify semaphore on topology changes.
+     *
+     * @param nodeId Id of the node that left the grid.
+     */
+    public void onNodeRemoved(UUID nodeId);
+
+    /**
+     * Callback to notify local reentrant lock instance on node stop.
+     */
+    public void onStop();
+}


Mime
View raw message