ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [36/50] [abbrv] ignite git commit: ignite-638: Implement IgniteSemaphore data structure
Date Tue, 24 Nov 2015 08:20:11 GMT
ignite-638: Implement IgniteSemaphore data structure


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e7e3309
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e7e3309
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e7e3309

Branch: refs/heads/ignite-1924
Commit: 8e7e330904b80f9a13659fdd7cf7f12dd6a36037
Parents: 900788b
Author: Vladisav Jelisavcic <vladisavj@gmail.com>
Authored: Fri Nov 20 17:39:40 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Fri Nov 20 17:39:40 2015 +0300

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  | 168 ++++
 .../ignite/examples/CacheExamplesSelfTest.java  |  10 +-
 .../src/main/java/org/apache/ignite/Ignite.java |  17 +
 .../java/org/apache/ignite/IgniteSemaphore.java | 312 ++++++++
 .../apache/ignite/events/DiscoveryEvent.java    |   6 +-
 .../apache/ignite/internal/IgniteKernal.java    |  21 +
 .../datastructures/DataStructuresProcessor.java | 199 ++++-
 .../datastructures/GridCacheSemaphoreEx.java    |  47 ++
 .../datastructures/GridCacheSemaphoreImpl.java  | 763 +++++++++++++++++++
 .../datastructures/GridCacheSemaphoreState.java | 144 ++++
 .../IgniteClientReconnectAtomicsTest.java       |  44 +-
 ...eAbstractDataStructuresFailoverSelfTest.java | 275 ++++++-
 .../IgniteClientDataStructuresAbstractTest.java |  59 +-
 .../IgniteDataStructureUniqueNameTest.java      |  14 +-
 .../IgniteSemaphoreAbstractSelfTest.java        | 411 ++++++++++
 .../local/IgniteLocalSemaphoreSelfTest.java     |  98 +++
 .../IgnitePartitionedSemaphoreSelfTest.java     |  33 +
 ...eplicatedDataStructuresFailoverSelfTest.java |   2 +-
 .../IgniteReplicatedSemaphoreSelfTest.java      |  33 +
 .../cache/GridCacheDataStructuresLoadTest.java  | 283 ++++---
 .../ignite/testframework/junits/IgniteMock.java |  10 +
 .../junits/multijvm/IgniteProcessProxy.java     |   7 +
 .../org/apache/ignite/IgniteSpringBean.java     |  12 +
 23 files changed, 2837 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
new file mode 100644
index 0000000..1c078b0
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/**
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
+ */
+public class IgniteSemaphoreExample {
+    /** Number of items for each producer/consumer to produce/consume. */
+    private static final int OPS_COUNT = 100;
+
+    /** Number of producers. */
+    private static final int NUM_PRODUCERS = 10;
+
+    /** Number of consumers. */
+    private static final int NUM_CONSUMERS = 10;
+
+    /** Synchronization semaphore name. */
+    private static final String SEM_NAME = IgniteSemaphoreExample.class.getSimpleName();
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     */
+    public static void main(String[] args) {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache atomic semaphore example started.");
+
+            // Initialize semaphore.
+            IgniteSemaphore syncSemaphore = ignite.semaphore(SEM_NAME, 0, false, true);
+
+            // Make name of semaphore.
+            final String semaphoreName = UUID.randomUUID().toString();
+
+            // Initialize semaphore.
+            IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
+
+            // Start consumers on all cluster nodes.
+            for (int i = 0; i < NUM_CONSUMERS; i++)
+                ignite.compute().withAsync().run(new Consumer(semaphoreName));
+
+            // Start producers on all cluster nodes.
+            for (int i = 0; i < NUM_PRODUCERS; i++)
+                ignite.compute().withAsync().run(new Producer(semaphoreName));
+
+            System.out.println("Master node is waiting for all other nodes to finish...");
+
+            // Wait for everyone to finish.
+            syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS);
+        }
+
+        System.out.flush();
+        System.out.println();
+        System.out.println("Finished semaphore example...");
+        System.out.println("Check all nodes for output (this node is also part of the cluster).");
+    }
+
+    /**
+     * Closure which simply waits on the latch on all nodes.
+     */
+    private abstract static class SemaphoreExampleClosure implements IgniteRunnable {
+        /** Semaphore name. */
+        protected final String semaphoreName;
+
+        /**
+         * @param semaphoreName Semaphore name.
+         */
+        SemaphoreExampleClosure(String semaphoreName) {
+            this.semaphoreName = semaphoreName;
+        }
+    }
+
+    /**
+     * Closure which simply signals the semaphore.
+     */
+    private static class Producer extends SemaphoreExampleClosure {
+        /**
+         * @param semaphoreName Semaphore name.
+         */
+        public Producer(String semaphoreName) {
+            super(semaphoreName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                    ", available=" + semaphore.availablePermits() + ']');
+
+                // Signals others that shared resource is available.
+                semaphore.release();
+            }
+
+            System.out.println("Producer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sem = Ignition.ignite().semaphore(SEM_NAME, 0, true, true);
+
+            // Signals the master thread
+            sem.release();
+        }
+    }
+
+    /**
+     * Closure which simply waits on semaphore.
+     */
+    private static class Consumer extends SemaphoreExampleClosure {
+        /**
+         * @param semaphoreName Semaphore name.
+         */
+        public Consumer(String semaphoreName) {
+            super(semaphoreName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore sem = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+
+            for (int i = 0; i < OPS_COUNT; i++) {
+                // Block if no permits are available.
+                sem.acquire();
+
+                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() +
+                    ", available=" + sem.availablePermits() + ']');
+            }
+
+            System.out.println("Consumer finished [nodeId=" + Ignition.ignite().cluster().localNode().id() + ']');
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sync = Ignition.ignite().semaphore(SEM_NAME, 3, true, true);
+
+            // Signals the master thread.
+            sync.release();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 79f404a..c11fa1a 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.examples.datastructures.IgniteAtomicReferenceExample;
 import org.apache.ignite.examples.datastructures.IgniteAtomicSequenceExample;
 import org.apache.ignite.examples.datastructures.IgniteAtomicStampedExample;
 import org.apache.ignite.examples.datastructures.IgniteCountDownLatchExample;
+import org.apache.ignite.examples.datastructures.IgniteSemaphoreExample;
 import org.apache.ignite.examples.datastructures.IgniteQueueExample;
 import org.apache.ignite.examples.datastructures.IgniteSetExample;
 import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
@@ -84,6 +85,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCacheSemaphoreExample() throws Exception {
+        IgniteSemaphoreExample.main(EMPTY_ARGS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCacheQueueExample() throws Exception {
         IgniteQueueExample.main(EMPTY_ARGS);
     }
@@ -150,4 +158,4 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
     public void testCacheContinuousQueryExample() throws Exception {
         CacheContinuousQueryExample.main(EMPTY_ARGS);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index fc9cf06..17221ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -430,6 +430,23 @@ public interface Ignite extends AutoCloseable {
         throws IgniteException;
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag
+     * is {@code true}, it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}.
+     * @param failoverSafe {@code True} to create failover safe semaphore which means that
+     *      if any node leaves topology permits already acquired by that node are silently released
+     *      and become available for alive nodes to acquire. If flag is {@code false} then
+     *      all threads waiting for available permits get interrupted.
+     * @param create Boolean flag indicating whether data structure should be created if does not exist.
+     * @return Semaphore for the given name.
+     * @throws IgniteException If semaphore could not be fetched or created.
+     */
+    public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create)
+        throws IgniteException;
+
+    /**
      * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
      * {@code null}.
      * If queue is present already, queue properties will not be changed. Use

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
new file mode 100644
index 0000000..db748b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ */
+public interface IgniteSemaphore extends Closeable {
+    /**
+     * Gets name of the semaphore.
+     *
+     * @return Name of the semaphore.
+     */
+    public String name();
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+     * Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public void acquire() throws IgniteInterruptedException;
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is available.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+     * next to be assigned a permit.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+     * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+     * have received the permit had no interruption occurred.  When the thread does return from this method its
+     * interrupt status will be set.
+     */
+    public void acquireUninterruptibly();
+
+    /**
+     * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return immediately with the value {@code false}.
+     *
+     * @return {@code true} if a permit was acquired and {@code false} otherwise
+     */
+    public boolean tryAcquire();
+
+    /**
+     * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+     * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
+     *
+     * <p>If a permit is acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+     * thrown and the current thread's interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.
+     *
+     * @param timeout the maximum time to wait for a permit
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+     * acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public boolean tryAcquire(long timeout, TimeUnit unit)
+        throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore, blocking until all are available.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+     * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+     * request.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+     * continue to wait and its position in the queue is not affected.  When the thread does return from this method its
+     * interrupt status will be set.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquireUninterruptibly(int permits);
+
+    /**
+     * Returns the current number of permits available in this semaphore.
+     *
+     * <p>This method is typically used for debugging and testing purposes.
+     *
+     * @return the number of permits available in this semaphore
+     */
+    public int availablePermits();
+
+    /**
+     * Acquires and returns all permits that are immediately available.
+     *
+     * @return the number of permits acquired
+     */
+    public int drainPermits();
+
+    /**
+     * Releases a permit, returning it to the semaphore.
+     *
+     * <p>Releases a permit, increasing the number of available permits by one.  If any threads are trying to acquire a
+     * permit, then one is selected and given the permit that was just released.  That thread is (re)enabled for thread
+     * scheduling purposes.
+     *
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
+     */
+    public void release();
+
+    /**
+     * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+     * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+     * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+     * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or <li>The specified waiting time elapses. </ul>
+     *
+     * <p>If the permits are acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+     * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+     * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+     * available by a call to {@link #release()}.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.  Any permits that were to be assigned to this thread, are instead
+     * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
+     * {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @param timeout the maximum time to wait for the permits
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+     * permits were acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+        throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+     * and the number of available permits is unchanged.
+     *
+     * <p>If you want to honor the failoverSafe setting, then use {@link #tryAcquire(int, long, TimeUnit)
+     * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+     *
+     * @param permits the number of permits to acquire
+     * @return {@code true} if the permits were acquired and {@code false} otherwise
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits);
+
+    /**
+     * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+     * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+     * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+     * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+     * {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquire(int permits) throws IgniteInterruptedException;
+
+    /**
+     * Releases the given number of permits, returning them to the semaphore.
+     *
+     * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+     * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+     * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+     * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+     * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+     * threads trying to acquire permits.
+     *
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+     * application.
+     *
+     * @param permits the number of permits to release
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void release(int permits);
+
+    /**
+     * Returns {@code true} if this semaphore is safe to use after node failure.
+     * If not, IgniteInterruptedException is thrown on every other node after node failure.
+     *
+     * @return {@code true} if this semaphore has failoverSafe set true
+     */
+    public boolean isFailoverSafe();
+
+    /**
+     * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+     * {@code true} return does not guarantee that any other thread will ever acquire.  This method is designed
+     * primarily for use in monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to acquire the lock
+     */
+    public boolean hasQueuedThreads();
+
+    /**
+     * Returns an estimate of the number of nodes waiting to acquire. The value is only an estimate because the number
+     * of nodes that are waiting may change dynamically while this method traverses internal data structures.  This method is designed
+     * for use in monitoring of the system state, not for synchronization control.
+     *
+     * @return the estimated number of nodes waiting for this lock
+     */
+    public int getQueueLength();
+
+    /**
+     * Gets {@code broken} status of the semaphore.
+     *
+     * @return {@code True} if a node failed on this semaphore and failoverSafe flag was set to false, {@code false} otherwise.
+     */
+    public boolean isBroken();
+
+    /**
+     * Gets {@code removed} status of the semaphore.
+     *
+     * @return {@code True} if semaphore was removed from cache, {@code false} otherwise.
+     */
+    public boolean removed();
+
+    /**
+     * Removes this semaphore.
+     *
+     * @throws IgniteException If operation failed.
+     */
+    @Override public void close();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
index 49c4f6e..09f23bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/DiscoveryEvent.java
@@ -113,9 +113,9 @@ public class DiscoveryEvent extends EventAdapter {
     /**
      * Gets node that caused this event to be generated. It is potentially different from the node
      * on which this event was recorded. For example, node {@code A} locally recorded the event that a remote node
-     * {@code B} joined the topology. In this case this method will return ID of {@code B}.
+     * {@code B} joined the topology. In this case this method will return node {@code B}.
      *
-     * @return Event node ID.
+     * @return Event node.
      */
     public ClusterNode eventNode() {
         return evtNode;
@@ -162,4 +162,4 @@ public class DiscoveryEvent extends EventAdapter {
             "type", name(),
             "tstamp", timestamp());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f1d67af..02096dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -58,6 +58,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -2936,6 +2937,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(
+        String name,
+        int cnt,
+        boolean failoverSafe,
+        boolean create
+    ) {
+        guard();
+
+        try {
+            return ctx.dataStructures().semaphore(name, cnt, failoverSafe, create);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 810bd8c..b532d7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -45,16 +45,20 @@ import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.configuration.AtomicConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheType;
@@ -82,12 +86,15 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_LONG;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_REF;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_SEQ;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -131,13 +138,16 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** Cache contains only {@code GridCacheCountDownLatchValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
 
+    /** Cache contains only {@code GridCacheSemaphoreState}. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView;
+
     /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
 
     /** Cache contains only {@code GridCacheAtomicStampedValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue> atomicStampedView;
 
-    /** Cache contains only entry {@code GridCacheSequenceValue}.  */
+    /** Cache contains only entry {@code GridCacheSequenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView;
 
     /** Cache context for atomic data structures. */
@@ -167,6 +177,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        super.start();
+
+        ctx.event().addLocalEventListener(
+            new GridLocalEventListener() {
+                @Override public void onEvent(final Event evt) {
+                    // This may require cache operation to exectue,
+                    // therefore cannot use event notification thread.
+                    ctx.closure().callLocalSafe(
+                        new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                                UUID leftNodeId = discoEvt.eventNode().id();
+
+                                for (GridCacheRemovable ds : dsMap.values()) {
+                                    if (ds instanceof GridCacheSemaphoreEx)
+                                        ((GridCacheSemaphoreEx)ds).onNodeRemoved(leftNodeId);
+                                }
+
+                                return null;
+                            }
+                        },
+                        false);
+                }
+            },
+            EVT_NODE_LEFT,
+            EVT_NODE_FAILED);
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void onKernalStart() throws IgniteCheckedException {
         if (ctx.config().isDaemon())
@@ -187,6 +228,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             cntDownLatchView = atomicsCache;
 
+            semView = atomicsCache;
+
             atomicLongView = atomicsCache;
 
             atomicRefView = atomicsCache;
@@ -262,7 +305,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      *
      * @param name Sequence name.
      * @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored.
-     * @param create  If {@code true} sequence will be created in case it is not in cache.
+     * @param create If {@code true} sequence will be created in case it is not in cache.
      * @return Sequence.
      * @throws IgniteCheckedException If loading failed.
      */
@@ -1194,6 +1237,124 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache,
+     * it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Initial count.
+     * @param failoverSafe {@code True} FailoverSafe parameter.
+     * @param create If {@code true} semaphore will be created in case it is not in cache,
+     *      if it is {@code false} all parameters except {@code name} are ignored.
+     * @return Semaphore for the given name or {@code null} if it is not found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteSemaphore semaphore(final String name, final int cnt, final boolean failoverSafe, final boolean create)
+        throws IgniteCheckedException {
+        A.notNull(name, "name");
+
+        awaitInitialization();
+
+        checkAtomicsConfiguration();
+
+        startQuery();
+
+        return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() {
+            @Override public IgniteSemaphore applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    // Check that semaphore hasn't been created in other thread yet.
+                    GridCacheSemaphoreEx sem = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
+
+                    if (sem != null) {
+                        assert val != null;
+
+                        return sem;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheSemaphoreState(cnt, new HashMap<UUID, Integer>(), failoverSafe);
+
+                        dsView.put(key, val);
+                    }
+
+                    GridCacheSemaphoreEx sem0 = new GridCacheSemaphoreImpl(
+                        name,
+                        key,
+                        semView,
+                        dsCacheCtx);
+
+                    dsMap.put(key, sem0);
+
+                    tx.commit();
+
+                    return sem0;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create semaphore: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class);
+    }
+
+    /**
+     * Removes semaphore from cache.
+     *
+     * @param name Name of the semaphore.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeSemaphore(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        awaitInitialization();
+
+        removeDataStructure(new IgniteOutClosureX<Void>() {
+            @Override public Void applyx() throws IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    if (val != null) {
+                        if (val.getCount() < 0)
+                            throw new IgniteCheckedException("Failed to remove semaphore with blocked threads. ");
+
+                        dsView.remove(key);
+
+                        tx.commit();
+                    }
+                    else
+                        tx.setRollbackOnly();
+
+                    return null;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, SEMAPHORE, null);
+    }
+
+    /**
      * Remove internal entry by key from cache.
      *
      * @param key Internal entry key.
@@ -1240,7 +1401,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
-                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+                return evt.getValue() instanceof GridCacheCountDownLatchValue ||
+                    evt.getValue() instanceof GridCacheSemaphoreState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1318,6 +1480,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+                    else if (val0 instanceof GridCacheSemaphoreState) {
+                        GridCacheInternalKey key = evt.getKey();
+
+                        // Notify semaphore on changes.
+                        final GridCacheRemovable sem = dsMap.get(key);
+
+                        GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
+
+                        if (sem instanceof GridCacheSemaphoreEx) {
+                            final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)sem;
+
+                            semaphore0.onUpdate(val);
+                        }
+                        else if (sem != null) {
+                            U.error(log, "Failed to cast object " +
+                                    "[expected=" + IgniteSemaphore.class.getSimpleName() +
+                                    ", actual=" + sem.getClass() + ", value=" + sem + ']');
+                        }
+                    }
 
                 }
                 else {
@@ -1407,7 +1588,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      * @return Removed value.
      */
     @SuppressWarnings("unchecked")
-    @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key) throws IgniteCheckedException {
+    @Nullable private <T> T retryRemove(final IgniteInternalCache cache, final Object key)
+        throws IgniteCheckedException {
         return retry(log, new Callable<T>() {
             @Nullable @Override public T call() throws Exception {
                 return (T)cache.getAndRemove(key);
@@ -1432,7 +1614,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 catch (ClusterGroupEmptyCheckedException e) {
                     throw new IgniteCheckedException(e);
                 }
-                catch (IgniteTxRollbackCheckedException | CachePartialUpdateCheckedException | ClusterTopologyCheckedException e) {
+                catch (IgniteTxRollbackCheckedException |
+                    CachePartialUpdateCheckedException |
+                    ClusterTopologyCheckedException e) {
                     if (cnt++ == MAX_UPDATE_RETRIES)
                         throw e;
                     else {
@@ -1535,7 +1719,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         QUEUE(IgniteQueue.class.getSimpleName()),
 
         /** */
-        SET(IgniteSet.class.getSimpleName());
+        SET(IgniteSet.class.getSimpleName()),
+
+        /** */
+        SEMAPHORE(IgniteSemaphore.class.getSimpleName());
 
         /** */
         private static final DataStructureType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
new file mode 100644
index 0000000..4d39635
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteSemaphore;
+
+/**
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
+ */
+public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
+    /**
+     * Get current semaphore key.
+     *
+     * @return Semaphore key.
+     */
+    public GridCacheInternalKey key();
+
+    /**
+     * Callback to notify semaphore on changes.
+     *
+     * @param val State containing the number of available permissions.
+     */
+    public void onUpdate(GridCacheSemaphoreState val);
+
+    /**
+     * Callback to notify semaphore on topology changes.
+     *
+     * @param nodeId Id of the node that left the grid.
+     */
+    public void onNodeRemoved(UUID nodeId);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
new file mode 100644
index 0000000..37df9d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer.
+ * Current implementation supports only unfair semaphores.
+ * If any node fails after acquiring permissions on cache semaphore, there are two different behaviors controlled with the
+ * parameter failoverSafe. If this parameter is true, other nodes can reacquire permits that were acquired by the failing node.
+ * In case this parameter is false, IgniteInterruptedException is called on every node waiting on this semaphore.
+ */
+public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deserialization stash. */
+    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                return F.t2();
+            }
+        };
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Semaphore name. */
+    private String name;
+
+    /** Removed flag. */
+    private volatile boolean rmvd;
+
+    /** Semaphore key. */
+    private GridCacheInternalKey key;
+
+    /** Semaphore projection. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView;
+
+    /** Cache context. */
+    private GridCacheContext ctx;
+
+    /** Initialization guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Internal synchronization object. */
+    private Sync sync;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridCacheSemaphoreImpl() {
+        // No-op.
+    }
+
+    /**
+     * Synchronization implementation for semaphore.
+     * Uses AQS state to represent permits.
+     */
+    final class Sync extends AbstractQueuedSynchronizer {
+        private static final long serialVersionUID = 1192457210091910933L;
+
+        /** Map containing number of acquired permits for each node waiting on this semaphore. */
+        private Map<UUID, Integer> nodeMap;
+
+        /** Flag indicating that it is safe to continue after node that acquired semaphore fails. */
+        final boolean failoverSafe;
+
+        /** Flag indicating that a node failed and it is not safe to continue using this semaphore. */
+        protected boolean broken = false;
+
+        protected Sync(int permits, Map<UUID, Integer> waiters, boolean failoverSafe) {
+            setState(permits);
+            nodeMap = waiters;
+            this.failoverSafe = failoverSafe;
+        }
+
+        /**
+         * Sets a map containing number of permits acquired by each node using this semaphore. This method should only
+         * be called in {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+         *
+         * @param nodeMap NodeMap.
+         */
+        protected synchronized void setWaiters(Map<UUID, Integer> nodeMap) {
+            this.nodeMap = nodeMap;
+        }
+
+        /**
+         * Gets the number of nodes waiting at this semaphore.
+         *
+         * @return Number of nodes waiting at this semaphore.
+         */
+        public int getWaiters() {
+            int totalWaiters = 0;
+
+            for (Integer i : nodeMap.values()) {
+                if (i > 0)
+                    totalWaiters++;
+            }
+
+            return totalWaiters;
+        }
+
+        /**
+         * Get number of permits for node.
+         *
+         * @param nodeID Node ID.
+         * @return Number of permits node has acquired at this semaphore. Can be less than 0 if
+         * more permits were released than acquired on node.
+         */
+        public int getPermitsForNode(UUID nodeID){
+            return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0;
+        }
+
+        /**
+         * Sets the number of permits currently available on this semaphore. This method should only be used in
+         * {@linkplain GridCacheSemaphoreImpl#onUpdate(GridCacheSemaphoreState)}.
+         *
+         * @param permits Number of permits available at this semaphore.
+         */
+        final synchronized void setPermits(int permits) {
+            setState(permits);
+        }
+
+        /**
+         * Gets the number of permissions currently available.
+         *
+         * @return Number of permits available at this semaphore.
+         */
+        final int getPermits() {
+            return getState();
+        }
+
+        /**
+         * This method is used by the AQS to test if the current thread should block or not.
+         *
+         * @param acquires Number of permits to acquire.
+         * @return Negative number if thread should block, positive if thread successfully acquires permits.
+         */
+        final int nonfairTryAcquireShared(int acquires) {
+            for (;;) {
+                int available = getState();
+
+                int remaining = available - acquires;
+
+                if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) {
+                    return remaining;
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int tryAcquireShared(int acquires) {
+            return nonfairTryAcquireShared(acquires);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected final boolean tryReleaseShared(int releases) {
+            // Check if some other node updated the state.
+            // This method is called with release==0 only when trying to wake through update.
+            if (releases == 0)
+                return true;
+
+            for (;;) {
+                int cur = getState();
+
+                int next = cur + releases;
+
+                if (next < cur) // overflow
+                    throw new Error("Maximum permit count exceeded");
+
+                if (compareAndSetGlobalState(cur, next, false))
+                    return true;
+            }
+        }
+
+        /**
+         * This method is used internally to implement {@linkplain GridCacheSemaphoreImpl#drainPermits()}.
+         *
+         * @return Number of permits to drain.
+         */
+        final int drainPermits() {
+            for (;;) {
+
+                int current = getState();
+
+                if (current == 0 || compareAndSetGlobalState(current, 0, true))
+                    return current;
+            }
+        }
+
+        /**
+         * This method is used for synchronizing the semaphore state across all nodes.
+         *
+         * @param expVal Expected number of permits.
+         * @param newVal New number of permits.
+         * @param draining True if used for draining the permits.
+         * @return True if this is the call that succeeded to change the global state.
+         */
+        protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+                                semView,
+                                    PESSIMISTIC, REPEATABLE_READ)
+                            ) {
+                                GridCacheSemaphoreState val = semView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+                                        name);
+
+                                boolean retVal = val.getCount() == expVal;
+
+                                if (retVal) {
+                                    // If this is not a call to drain permits,
+                                    // Modify global permission count for the calling node.
+                                    if (!draining) {
+                                        UUID nodeID = ctx.localNodeId();
+
+                                        Map<UUID,Integer> map = val.getWaiters();
+
+                                        int waitingCnt = expVal - newVal;
+
+                                        if(map.containsKey(nodeID))
+                                            waitingCnt += map.get(nodeID);
+
+                                        map.put(nodeID, waitingCnt);
+
+                                        val.setWaiters(map);
+                                    }
+
+                                    val.setCount(newVal);
+
+                                    semView.put(key, val);
+
+                                    tx.commit();
+                                }
+
+                                return retVal;
+                            }
+                            catch (Error | Exception e) {
+                                if (!ctx.kernalContext().isStopping())
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                if (ctx.kernalContext().isStopping()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e);
+
+                    return true;
+                }
+                else
+                    throw U.convertException(e);
+            }
+        }
+
+        /**
+         * This method is used for releasing the permits acquired by failing node.
+         *
+         * @param nodeId ID of the failing node.
+         * @return True if this is the call that succeeded to change the global state.
+         */
+        protected boolean releaseFailedNode(final UUID nodeId) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (
+                                IgniteInternalTx tx = CU.txStartInternal(ctx,
+                                    semView,
+                                    PESSIMISTIC, REPEATABLE_READ)
+                            ) {
+                                GridCacheSemaphoreState val = semView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " +
+                                        name);
+
+                                Map<UUID,Integer> map = val.getWaiters();
+
+                                if(!map.containsKey(nodeId)){
+                                    tx.rollback();
+
+                                    return false;
+                                }
+
+                                int numPermits = map.get(nodeId);
+
+                                if(numPermits > 0)
+                                    val.setCount(val.getCount() + numPermits);
+
+                                map.remove(nodeId);
+
+                                val.setWaiters(map);
+
+                                semView.put(key, val);
+
+                                sync.nodeMap = map;
+
+                                tx.commit();
+
+                                return true;
+                            }
+                            catch (Error | Exception e) {
+                                if (!ctx.kernalContext().isStopping())
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                if (ctx.kernalContext().isStopping()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignoring failure in semaphore on node left handler (node is stopping): " + e);
+
+                    return true;
+                }
+                else
+                    throw U.convertException(e);
+            }
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Semaphore name.
+     * @param key Semaphore key.
+     * @param semView Semaphore projection.
+     * @param ctx Cache context.
+     */
+    public GridCacheSemaphoreImpl(
+        String name,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semView,
+        GridCacheContext ctx
+    ) {
+        assert name != null;
+        assert key != null;
+        assert semView != null;
+        assert ctx != null;
+
+        this.name = name;
+        this.key = key;
+        this.semView = semView;
+        this.ctx = ctx;
+
+        log = ctx.logger(getClass());
+    }
+
+    /**
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private void initializeSemaphore() throws IgniteCheckedException {
+        if (!initGuard.get() && initGuard.compareAndSet(false, true)) {
+            try {
+                sync = CU.outTx(
+                    retryTopologySafe(new Callable<Sync>() {
+                        @Override public Sync call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx,
+                                semView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semView.get(key);
+
+                                if (val == null) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find semaphore with given name: " + name);
+
+                                    return null;
+                                }
+
+                                final int count = val.getCount();
+
+                                Map<UUID, Integer> waiters = val.getWaiters();
+
+                                final boolean failoverSafe = val.isFailoverSafe();
+
+                                tx.commit();
+
+                                return new Sync(count, waiters, failoverSafe);
+                            }
+                        }
+                    }),
+                    ctx
+                );
+
+                if (log.isDebugEnabled())
+                    log.debug("Initialized internal sync structure: " + sync);
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            U.await(initLatch);
+
+            if (sync == null)
+                throw new IgniteCheckedException("Internal semaphore has not been properly initialized.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheInternalKey key() {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return rmvd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onRemoved() {
+        return rmvd = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdate(GridCacheSemaphoreState val) {
+        if (sync == null)
+            return;
+
+        // Update permission count.
+        sync.setPermits(val.getCount());
+
+        // Update waiters' counts.
+        sync.setWaiters(val.getWaiters());
+
+        // Try to notify any waiting threads.
+        sync.releaseShared(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onNodeRemoved(UUID nodeId) {
+        int numPermits = sync.getPermitsForNode(nodeId);
+
+        if (numPermits > 0) {
+            if (sync.failoverSafe)
+                // Release permits acquired by threads on failing node.
+                sync.releaseFailedNode(nodeId);
+            else {
+                // Interrupt every waiting thread if this semaphore is not failover safe.
+                sync.broken = true;
+
+                for (Thread t : sync.getSharedQueuedThreads())
+                    t.interrupt();
+
+                // Try to notify any waiting threads.
+                sync.releaseShared(0);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void needCheckNotRemoved() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acquire() throws IgniteInterruptedException {
+        acquire(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acquire(int permits) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+        try {
+            initializeSemaphore();
+
+            if(isBroken())
+                Thread.currentThread().interrupt();
+
+            sync.acquireSharedInterruptibly(permits);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acquireUninterruptibly() {
+        try {
+            initializeSemaphore();
+
+            sync.acquireShared(1);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void acquireUninterruptibly(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+        try {
+            initializeSemaphore();
+
+            sync.acquireShared(permits);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int availablePermits() {
+        int ret;
+        try {
+            initializeSemaphore();
+
+            ret = CU.outTx(
+                retryTopologySafe(new Callable<Integer>() {
+                    @Override public Integer call() throws Exception {
+                        try (
+                            IgniteInternalTx tx = CU.txStartInternal(ctx,
+                                semView, PESSIMISTIC, REPEATABLE_READ)
+                        ) {
+                            GridCacheSemaphoreState val = semView.get(key);
+
+                            if (val == null)
+                                throw new IgniteException("Failed to find semaphore with given name: " + name);
+
+                            int count = val.getCount();
+
+                            tx.rollback();
+
+                            return count;
+                        }
+                    }
+                }),
+                ctx
+            );
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int drainPermits() {
+        try {
+            initializeSemaphore();
+
+            return sync.drainPermits();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire() {
+        try {
+            initializeSemaphore();
+
+            return sync.nonfairTryAcquireShared(1) >= 0;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+        try {
+            initializeSemaphore();
+
+            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release() {
+        release(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+        try {
+            initializeSemaphore();
+
+            sync.releaseShared(permits);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
+        try {
+            initializeSemaphore();
+
+            return sync.nonfairTryAcquireShared(permits) >= 0;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+
+            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isFailoverSafe() {
+        return sync.failoverSafe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThreads() {
+        try {
+            initializeSemaphore();
+
+            return sync.getWaiters() != 0;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getQueueLength() {
+        try {
+            initializeSemaphore();
+
+            return sync.getWaiters();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBroken(){
+        return sync.broken;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (!rmvd) {
+            try {
+                ctx.kernalContext().dataStructures().removeSemaphore(name);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheSemaphoreImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
new file mode 100644
index 0000000..50cdf10
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Grid cache semaphore state.
+ */
+public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Permission count. */
+    private int cnt;
+
+    /** Map containing number of acquired permits for each node waiting on this semaphore. */
+    @GridToStringInclude
+    private Map<UUID, Integer> waiters;
+
+    /** FailoverSafe flag. */
+    private boolean failoverSafe;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     * @param waiters Waiters map.
+     * @param failoverSafe Failover safe flag.
+     */
+    public GridCacheSemaphoreState(int cnt, @Nullable Map<UUID,Integer> waiters, boolean failoverSafe) {
+        this.cnt = cnt;
+        this.waiters = waiters;
+        this.failoverSafe = failoverSafe;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheSemaphoreState() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void setCount(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int getCount() {
+        return cnt;
+    }
+
+    /**
+     * @return Waiters.
+     */
+    public Map<UUID,Integer> getWaiters() {
+        return waiters;
+    }
+
+    /**
+     * @param waiters Map containing the number of permissions acquired by each node.
+     */
+    public void setWaiters(Map<UUID, Integer> waiters) {
+        this.waiters = waiters;
+    }
+
+    /**
+     * @return failoverSafe flag.
+     */
+    public boolean isFailoverSafe() {
+        return failoverSafe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeBoolean(failoverSafe);
+        out.writeBoolean(waiters != null);
+
+        if (waiters != null) {
+            out.writeInt(waiters.size());
+
+            for (Map.Entry<UUID, Integer> e : waiters.entrySet()) {
+                U.writeUuid(out, e.getKey());
+                out.writeInt(e.getValue());
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        failoverSafe = in.readBoolean();
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            waiters = U.newHashMap(size);
+
+            for (int i = 0; i < size; i++)
+                waiters.put(U.readUuid(in), in.readInt());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheSemaphoreState.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
index 55dbb57..c46b5c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
 import org.apache.ignite.testframework.GridTestUtils;
 
@@ -675,4 +676,45 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr
         assertTrue(srvLatch.await(1000));
         assertTrue(clientLatch.await(1000));
     }
-}
\ No newline at end of file
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSemaphoreReconnect() throws Exception {
+        Ignite client = grid(serverCount());
+
+        assertTrue(client.cluster().localNode().isClient());
+
+        Ignite srv = clientRouter(client);
+
+        IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true);
+
+        assertEquals(3, clientSemaphore.availablePermits());
+
+        final IgniteSemaphore srvSemaphore = srv.semaphore("semaphore1", 3, false, false);
+
+        assertEquals(3, srvSemaphore.availablePermits());
+
+        reconnectClientNode(client, srv, new Runnable() {
+            @Override public void run() {
+                srvSemaphore.acquire();
+            }
+        });
+
+        assertEquals(2, srvSemaphore.availablePermits());
+        assertEquals(2, clientSemaphore.availablePermits());
+
+        srvSemaphore.acquire();
+
+        assertEquals(1, srvSemaphore.availablePermits());
+        assertEquals(1, clientSemaphore.availablePermits());
+
+        clientSemaphore.acquire();
+
+        assertEquals(0, srvSemaphore.availablePermits());
+        assertEquals(0, clientSemaphore.availablePermits());
+
+        assertFalse(srvSemaphore.tryAcquire());
+        assertFalse(srvSemaphore.tryAcquire());
+    }
+}


Mime
View raw message