ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [1/6] ignite git commit: Implements distributed semaphore ignite-638
Date Fri, 23 Oct 2015 16:10:42 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-638 [created] 5f38a18e1


Implements distributed semaphore ignite-638

This interface provides a rich API for working with distributed semaphore.
Distributed semaphore provides functionality similar to java.util.concurrent.Semaphore.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e9567ade
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e9567ade
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e9567ade

Branch: refs/heads/ignite-638
Commit: e9567adec4057b0a0528735d4934440f4cd9fee3
Parents: b08e0b2
Author: vladisav <vladisavj@gmail.com>
Authored: Fri Sep 25 11:14:18 2015 +0200
Committer: vladisav <vladisavj@gmail.com>
Committed: Thu Oct 1 04:26:48 2015 +0200

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  | 183 ++++++
 .../src/main/java/org/apache/ignite/Ignite.java |  14 +
 .../java/org/apache/ignite/IgniteSemaphore.java | 390 ++++++++++++
 .../apache/ignite/internal/IgniteKernal.java    |  19 +
 .../datastructures/DataStructuresProcessor.java | 161 ++++-
 .../datastructures/GridCacheSemaphoreEx.java    |  22 +
 .../datastructures/GridCacheSemaphoreImpl.java  | 619 +++++++++++++++++++
 .../datastructures/GridCacheSemaphoreState.java | 128 ++++
 .../datastructures/GridCacheSemaphoreValue.java | 115 ++++
 .../ignite/testframework/junits/IgniteMock.java |  10 +
 .../junits/multijvm/IgniteProcessProxy.java     |   7 +
 .../org/apache/ignite/IgniteSpringBean.java     |  11 +
 12 files changed, 1677 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
new file mode 100644
index 0000000..2ef242c
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -0,0 +1,183 @@
+package org.apache.ignite.examples.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+
+/**
+ * This example demonstrates cache based semaphore.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class IgniteSemaphoreExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = IgniteSemaphoreExample.class.getSimpleName();
+
+    /** Number of items for each producer/consumer to produce/consume. */
+    private static final int ITEM_COUNT = 100;
+
+    /** Number of producers. */
+    private static final int NUM_PRODUCERS = 10;
+
+    /** Number of consumers. */
+    private static final int NUM_CONSUMERS = 10;
+
+    /** Synchronization semaphore name. */
+    private static final String syncName = IgniteSemaphoreExample.class.getSimpleName();
+
+    public static void main(String[] args) {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache atomic semaphore example started.");
+
+            // Initialize semaphore.
+            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+
+            // Make name of semaphore.
+            final String semaphoreName = UUID.randomUUID().toString();
+
+            // Make name of mutex
+            final String mutexName = UUID.randomUUID().toString();
+
+            // Make shared resource
+            final String resourceName = UUID.randomUUID().toString();
+            IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+            cache.put(resourceName, new LinkedList<>());
+
+            // Initialize semaphore.
+            IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
+
+            // Initialize mutex.
+            IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+
+            // Start consumers on all cluster nodes.
+            for (int i = 0; i < NUM_CONSUMERS; i++)
+                ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
+
+            // Start producers on all cluster nodes.
+            for(int i = 0; i < NUM_PRODUCERS; i++)
+                ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
+
+            System.out.println("Master node is waiting for all other nodes to finish...");
+
+            // Wait for everyone to finish.
+            syncSemaphore.acquire(NUM_CONSUMERS + NUM_PRODUCERS);
+        }
+
+        System.out.flush();
+        System.out.println();
+        System.out.println("Finished semaphore example...");
+        System.out.println("Check all nodes for output (this node is also part of the cluster).");
+    }
+
+    /**
+     * Closure which simply waits on the latch on all nodes.
+     */
+    private abstract static class SemaphoreExampleClosure implements IgniteRunnable {
+        /** Semaphore name. */
+        protected final String semaphoreName;
+
+        /** Mutex name. */
+        protected final String mutexName;
+
+        /** Resource name. */
+        protected final String resourceName;
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        SemaphoreExampleClosure(String mutexName, String semaphoreName, String resourceName) {
+            this.semaphoreName = semaphoreName;
+            this.mutexName = mutexName;
+            this.resourceName = resourceName;
+        }
+    }
+
+    /**
+     * Closure which simply signals the semaphore.
+     */
+    private static class Producer extends SemaphoreExampleClosure {
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        public Producer(String mutexName, String semaphoreName, String resourceName) {
+            super(mutexName, semaphoreName, resourceName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+            for(int i=0;i<ITEM_COUNT;i++) {
+                mutex.acquire();
+
+                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                queue.add(Ignition.ignite().cluster().localNode().id().toString());
+                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+                System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
+
+                mutex.release();
+
+                semaphore.release();
+            }
+
+            System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 0, true, true);
+            sync.release();
+        }
+    }
+
+    /**
+     * Closure which simply waits on semaphore.
+     */
+    private static class Consumer extends SemaphoreExampleClosure {
+
+        /**
+         * @param mutexName Mutex name.
+         * @param semaphoreName Semaphore name.
+         * @param resourceName Resource name.
+         */
+        public Consumer(String mutexName, String semaphoreName, String resourceName) {
+            super(mutexName, semaphoreName, resourceName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+
+            for(int i=0;i<ITEM_COUNT;i++) {
+                semaphore.acquire();
+
+                mutex.acquire();
+
+                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                String data = queue.remove();
+                Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+                System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
+
+                mutex.release();
+            }
+
+            System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
+            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 3, true, true);
+            sync.release();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 0afccd0..dffb126 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -420,6 +420,20 @@ public interface Ignite extends AutoCloseable {
         throws IgniteException;
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache and {@code create} flag
+     * is {@code true}, it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Count for new semaphore creation. Ignored if {@code create} flag is {@code false}.
+     * @param fair {@code True} to enable fairness.
+     * @param create Boolean flag indicating whether data structure should be created if does not exist.
+     * @return Semaphore for the given name.
+     * @throws IgniteException If semaphore could not be fetched or created.
+     */
+    public IgniteSemaphore semaphore(String name, int cnt, boolean fair, boolean create)
+            throws IgniteException;
+
+    /**
      * Will get a named queue from cache and create one if it has not been created yet and {@code cfg} is not
      * {@code null}.
      * If queue is present already, queue properties will not be changed. Use

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
new file mode 100644
index 0000000..0e29d00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -0,0 +1,390 @@
+package org.apache.ignite;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This interface provides a rich API for working with distributed semaphore.
+ * <p>
+ * <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
+ * <h1 class="header">Creating Distributed Semaphore</h1>
+ * Instance of cache semaphore can be created by calling the following method:
+ * {@link Ignite#semaphore(String, int, boolean, boolean)}.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public interface IgniteSemaphore extends Closeable{
+    /**
+     * Gets name of the semaphore.
+     *
+     * @return Name of the semaphore.
+     */
+    public String name();
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is
+     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public void acquire() throws IgniteInterruptedException;
+
+    /**
+     * Acquires a permit from this semaphore, blocking until one is
+     * available.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting for a permit then it will continue to wait, but the
+     * time at which the thread is assigned a permit may change compared to
+     * the time it would have received the permit had no interruption
+     * occurred.  When the thread does return from this method its interrupt
+     * status will be set.
+     */
+    public void acquireUninterruptibly();
+
+    /**
+     * Acquires a permit from this semaphore, only if one is available at the
+     * time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the value {@code true},
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return
+     * immediately with the value {@code false}.
+     *
+     * <p>Even when this semaphore has been set to use a
+     * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
+     * immediately acquire a permit if one is available, whether or not
+     * other threads are currently waiting.
+     * This &quot;barging&quot; behavior can be useful in certain
+     * circumstances, even though it breaks fairness. If you want to honor
+     * the fairness setting, then use
+     * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
+     * which is almost equivalent (it also detects interruption).
+     *
+     * @return {@code true} if a permit was acquired and {@code false}
+     *         otherwise
+     */
+    public boolean tryAcquire();
+
+    /**
+     * Acquires a permit from this semaphore, if one becomes available
+     * within the given waiting time and the current thread has not
+     * been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately,
+     * with the value {@code true},
+     * reducing the number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If a permit is acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * to acquire a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * @param timeout the maximum time to wait for a permit
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if a permit was acquired and {@code false}
+     *         if the waiting time elapsed before a permit was acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     */
+    public boolean tryAcquire(long timeout, TimeUnit unit)
+            throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore,
+     * blocking until all are available.
+     *
+     * <p>Acquires the given number of permits, if they are available,
+     * and returns immediately, reducing the number of available permits
+     * by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
+     * while waiting for permits then it will continue to wait and its
+     * position in the queue is not affected.  When the thread does return
+     * from this method its interrupt status will be set.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquireUninterruptibly(int permits);
+
+
+    /**
+     * Returns the current number of permits available in this semaphore.
+     *
+     * <p>This method is typically used for debugging and testing purposes.
+     *
+     * @return the number of permits available in this semaphore
+     */
+    public int availablePermits();
+
+    /**
+     * Acquires and returns all permits that are immediately available.
+     *
+     * @return the number of permits acquired
+     */
+    public int drainPermits();
+
+    /**
+     * Releases a permit, returning it to the semaphore.
+     *
+     * <p>Releases a permit, increasing the number of available permits by
+     * one.  If any threads are trying to acquire a permit, then one is
+     * selected and given the permit that was just released.  That thread
+     * is (re)enabled for thread scheduling purposes.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link #acquire}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     */
+    public void release();
+
+    /**
+     * Acquires the given number of permits from this semaphore, if all
+     * become available within the given waiting time and the current
+     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available and
+     * returns immediately, with the value {@code true},
+     * reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then
+     * the current thread becomes disabled for thread scheduling
+     * purposes and lies dormant until one of three things happens:
+     * <ul>
+     * <li>Some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If the permits are acquired then the value {@code true} is returned.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * to acquire the permits,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * Any permits that were to be assigned to this thread, are instead
+     * assigned to other threads trying to acquire permits, as if
+     * the permits had been made available by a call to {@link #release()}.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.  Any permits that were to be assigned to this
+     * thread, are instead assigned to other threads trying to acquire
+     * permits, as if the permits had been made available by a call to
+     * {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @param timeout the maximum time to wait for the permits
+     * @param unit the time unit of the {@code timeout} argument
+     * @return {@code true} if all permits were acquired and {@code false}
+     *         if the waiting time elapsed before all permits were acquired
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+            throws IgniteInterruptedException;
+
+    /**
+     * Acquires the given number of permits from this semaphore, only
+     * if all are available at the time of invocation.
+     *
+     * <p>Acquires the given number of permits, if they are available, and
+     * returns immediately, with the value {@code true},
+     * reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then this method will return
+     * immediately with the value {@code false} and the number of available
+     * permits is unchanged.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering
+     * policy, a call to {@code tryAcquire} <em>will</em>
+     * immediately acquire a permit if one is available, whether or
+     * not other threads are currently waiting.  This
+     * &quot;barging&quot; behavior can be useful in certain
+     * circumstances, even though it breaks fairness. If you want to
+     * honor the fairness setting, then use {@link #tryAcquire(int,
+     * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
+     * which is almost equivalent (it also detects interruption).
+     *
+     * @param permits the number of permits to acquire
+     * @return {@code true} if the permits were acquired and
+     *         {@code false} otherwise
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public boolean tryAcquire(int permits);
+
+    /**
+     * Acquires the given number of permits from this semaphore,
+     * blocking until all are available,
+     * or the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available,
+     * and returns immediately, reducing the number of available permits
+     * by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes
+     * disabled for thread scheduling purposes and lies dormant until
+     * one of two things happens:
+     * <ul>
+     * <li>Some other thread invokes one of the {@link #release() release}
+     * methods for this semaphore, the current thread is next to be assigned
+     * permits and the number of available permits satisfies this request; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
+     * for a permit,
+     * </ul>
+     * then {@link IgniteInterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     * Any permits that were to be assigned to this thread are instead
+     * assigned to other threads trying to acquire permits, as if
+     * permits had been made available by a call to {@link #release()}.
+     *
+     * @param permits the number of permits to acquire
+     * @throws IgniteInterruptedException if the current thread is interrupted
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void acquire(int permits) throws IgniteInterruptedException;
+
+    /**
+     * Releases the given number of permits, returning them to the semaphore.
+     *
+     * <p>Releases the given number of permits, increasing the number of
+     * available permits by that amount.
+     * If any threads are trying to acquire permits, then one
+     * is selected and given the permits that were just released.
+     * If the number of available permits satisfies that thread's request
+     * then that thread is (re)enabled for thread scheduling purposes;
+     * otherwise the thread will wait until sufficient permits are available.
+     * If there are still permits available
+     * after this thread's request has been satisfied, then those permits
+     * are assigned in turn to other threads trying to acquire permits.
+     *
+     * <p>There is no requirement that a thread that releases a permit must
+     * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
+     * Correct usage of a semaphore is established by programming convention
+     * in the application.
+     *
+     * @param permits the number of permits to release
+     * @throws IllegalArgumentException if {@code permits} is negative
+     */
+    public void release(int permits);
+
+    /**
+     * Returns {@code true} if this semaphore has fairness set true.
+     *
+     * @return {@code true} if this semaphore has fairness set true
+     */
+    public boolean isFair();
+
+    /**
+     * Queries whether any threads are waiting to acquire. Note that
+     * because cancellations may occur at any time, a {@code true}
+     * return does not guarantee that any other thread will ever
+     * acquire.  This method is designed primarily for use in
+     * monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to
+     *         acquire the lock
+     */
+    public boolean hasQueuedThreads();
+
+    /**
+     * Returns an estimate of the number of threads waiting to acquire.
+     * The value is only an estimate because the number of threads may
+     * change dynamically while this method traverses internal data
+     * structures.  This method is designed for use in monitoring of the
+     * system state, not for synchronization control.
+     *
+     * @return the estimated number of threads waiting for this lock
+     */
+    public int getQueueLength();
+
+    /**
+     * Gets {@code removed} status of the semaphore.
+     *
+     * @return {@code True} if semaphore was removed from cache, {@code false} otherwise.
+     */
+    public boolean removed();
+
+    /**
+     * Removes this semaphore.
+     *
+     * @throws IgniteException If operation failed.
+     */
+    @Override public void close();
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c02dc59..756278a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -2890,6 +2891,24 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+       int cnt,
+       boolean fair,
+       boolean create) {
+        guard();
+
+        try {
+            return ctx.dataStructures().semaphore(name, cnt, fair, create);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index ef2c543..220dec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -42,6 +42,7 @@ import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteQueue;
@@ -89,6 +90,7 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET;
+import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -131,6 +133,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** Cache contains only {@code GridCacheCountDownLatchValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheCountDownLatchValue> cntDownLatchView;
 
+    /** Cache contains only {@code GridCacheSemaphoreState}. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
     /** Cache contains only {@code GridCacheAtomicReferenceValue}. */
     private IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicReferenceValue> atomicRefView;
 
@@ -187,6 +192,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             cntDownLatchView = atomicsCache;
 
+            semaphoreView = atomicsCache;
+
             atomicLongView = atomicsCache;
 
             atomicRefView = atomicsCache;
@@ -1176,6 +1183,133 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Gets or creates semaphore. If semaphore is not found in cache,
+     * it is created using provided name and count parameter.
+     *
+     * @param name Name of the semaphore.
+     * @param cnt Initial count.
+     * @param fair {@code True} Fairness parameter.
+     * @param create If {@code true} semaphore will be created in case it is not in cache,
+     *      if it is {@code false} all parameters except {@code name} are ignored.
+     * @return Semaphore for the given name or {@code null} if it is not found and
+     *      {@code create} is false.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public IgniteSemaphore semaphore(final String name,
+                                               final int cnt,
+                                               final boolean fair,
+                                               final boolean create)
+            throws IgniteCheckedException
+    {
+        A.notNull(name, "name");
+
+        awaitInitialization();
+
+        if (create)
+            A.ensure(cnt >= 0, "count can not be negative");
+
+        checkAtomicsConfiguration();
+
+        startQuery();
+
+        return getAtomic(new IgniteOutClosureX<IgniteSemaphore>() {
+            @Override public IgniteSemaphore applyx() throws IgniteCheckedException {
+                GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    // Check that count down hasn't been created in other thread yet.
+                    GridCacheSemaphoreEx semaphore = cast(dsMap.get(key), GridCacheSemaphoreEx.class);
+
+                    if (semaphore != null) {
+                        assert val != null;
+
+                        return semaphore;
+                    }
+
+                    if (val == null && !create)
+                        return null;
+
+                    if (val == null) {
+                        val = new GridCacheSemaphoreState(cnt, 0);
+
+                        dsView.put(key, val);
+                    }
+
+                    semaphore = new GridCacheSemaphoreImpl(name, val.getCnt(),
+                            fair,
+                            key,
+                            semaphoreView,
+                            dsCacheCtx);
+
+                    dsMap.put(key, semaphore);
+
+                    tx.commit();
+
+                    return semaphore;
+                }
+                catch (Error | Exception e) {
+                    dsMap.remove(key);
+
+                    U.error(log, "Failed to create count down latch: " + name, e);
+
+                    throw e;
+                }
+                finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, new DataStructureInfo(name, SEMAPHORE, null), create, GridCacheSemaphoreEx.class);
+    }
+
+    /**
+     * Removes semaphore from cache.
+     *
+     * @param name Name of the semaphore.
+     * @throws IgniteCheckedException If operation failed.
+     */
+    public void removeSemaphore(final String name) throws IgniteCheckedException {
+        assert name != null;
+        assert dsCacheCtx != null;
+
+        awaitInitialization();
+
+        removeDataStructure(new IgniteOutClosureX<Void>() {
+            @Override
+            public Void applyx() throws IgniteCheckedException {
+                GridCacheInternal key = new GridCacheInternalKeyImpl(name);
+
+                dsCacheCtx.gate().enter();
+
+                try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
+                    // Check correctness type of removable object.
+                    GridCacheSemaphoreState val =
+                            cast(dsView.get(key), GridCacheSemaphoreState.class);
+
+                    if (val != null) {
+                        if (val.getCnt() < 0) {
+                            throw new IgniteCheckedException("Failed to remove semaphore " +
+                                    "with blocked threads. ");
+                        }
+
+                        dsView.remove(key);
+
+                        tx.commit();
+                    } else
+                        tx.setRollbackOnly();
+
+                    return null;
+                } finally {
+                    dsCacheCtx.gate().leave();
+                }
+            }
+        }, name, SEMAPHORE, null);
+    }
+
+    /**
      * Remove internal entry by key from cache.
      *
      * @param key Internal entry key.
@@ -1222,7 +1356,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
             if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
-                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+                return evt.getValue() instanceof GridCacheCountDownLatchValue ||
+                        evt.getValue() instanceof GridCacheSemaphoreState;
             else {
                 assert evt.getEventType() == EventType.REMOVED : evt;
 
@@ -1297,6 +1432,25 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+                    else if(val0 instanceof GridCacheSemaphoreState) {
+                        GridCacheInternalKey key = evt.getKey();
+
+                        // Notify semaphore on changes.
+                        final GridCacheRemovable semaphore = dsMap.get(key);
+
+                        GridCacheSemaphoreState val = (GridCacheSemaphoreState)val0;
+
+                        if (semaphore instanceof GridCacheSemaphoreEx) {
+                            final GridCacheSemaphoreEx semaphore0 = (GridCacheSemaphoreEx)semaphore;
+
+                            semaphore0.onUpdate(val);
+                        }
+                        else if (semaphore != null) {
+                            U.error(log, "Failed to cast object " +
+                                    "[expected=" + IgniteSemaphore.class.getSimpleName() +
+                                    ", actual=" + semaphore.getClass() + ", value=" + semaphore + ']');
+                        }
+                    }
 
                 }
                 else {
@@ -1514,7 +1668,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         QUEUE(IgniteQueue.class.getSimpleName()),
 
         /** */
-        SET(IgniteSet.class.getSimpleName());
+        SET(IgniteSet.class.getSimpleName()),
+
+        /** */
+        SEMAPHORE(IgniteSemaphore.class.getSimpleName());
 
         /** */
         private static final DataStructureType[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
new file mode 100644
index 0000000..0f939d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -0,0 +1,22 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.IgniteSemaphore;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
+    /**
+     * Get current semaphore key.
+     *
+     * @return Semaphore key.
+     */
+    public GridCacheInternalKey key();
+
+    /**
+     * Callback to notify semaphore on changes.
+     *
+     * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+     */
+    public void onUpdate(GridCacheSemaphoreState val);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
new file mode 100644
index 0000000..24c3ec5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -0,0 +1,619 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer.
+ * Current implementation supports only unfair and locally fair modes.
+ * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
+ * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
+ * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
+ *
+ * @author Vladisav Jelisavcic
+ */
+public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deserialization stash. */
+    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+            new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+                @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                    return F.t2();
+                }
+            };
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Semaphore name. */
+    private String name;
+
+    /** Removed flag.*/
+    private volatile boolean rmvd;
+
+    /** Semaphore key. */
+    private GridCacheInternalKey key;
+
+    /** Semaphore projection. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView;
+
+    /** Cache context. */
+    private GridCacheContext ctx;
+
+    /** Fairness flag. */
+    private boolean isFair;
+
+    /** Initial count. */
+    private transient final int initCnt;
+
+    /** Initialization guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Internal synchronization object. */
+    private Sync sync;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridCacheSemaphoreImpl() {
+        // No-op.
+        initCnt = 0;
+    }
+
+    /**
+     * Synchronization implementation for semaphore.  Uses AQS state
+     * to represent permits. Subclassed into fair and nonfair
+     * versions.
+     */
+    abstract class Sync extends AbstractQueuedSynchronizer {
+        private static final long serialVersionUID = 1192457210091910933L;
+
+        protected ConcurrentMap<Thread,Integer> threadMap;
+        protected int totalWaiters;
+
+        Sync(int permits) {
+            setState(permits);
+            threadMap = new ConcurrentHashMap<>();
+        }
+
+        protected synchronized void setWaiters(int waiters){
+            totalWaiters = waiters;
+        }
+
+        public int getWaiters() {
+            return totalWaiters;
+        }
+
+        final synchronized void setPermits(int permits){
+            setState(permits);
+        }
+
+        final int getPermits() {
+            return getState();
+        }
+
+        final int nonfairTryAcquireShared(int acquires) {
+            for (;;) {
+                int available = getState();
+                int remaining = available - acquires;
+
+                if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+                    if(remaining < 0){
+                        if(!threadMap.containsKey(Thread.currentThread()))
+                            getAndIncWaitingCount();
+                    }
+
+                    return remaining;
+                }
+            }
+        }
+
+        protected final boolean tryReleaseShared(int releases) {
+            // Check if some other node updated the state.
+            // This method is called with release==0 only when trying to wake through update.
+            if(releases == 0)
+                return true;
+
+            for (;;) {
+                int current = getState();
+                int next = current + releases;
+                if (next < current) // overflow
+                    throw new Error("Maximum permit count exceeded");
+                if (compareAndSetGlobalState(current, next))
+                    return true;
+            }
+        }
+
+        final void reducePermits(int reductions) {
+            for (;;) {
+                int current = getState();
+                int next = current - reductions;
+                if (next > current) // underflow
+                    throw new Error("Permit count underflow");
+                if (compareAndSetGlobalState(current, next))
+                    return;
+            }
+        }
+
+        final int drainPermits() {
+            for (;;) {
+                int current = getState();
+                if (current == 0 || compareAndSetGlobalState(current, 0))
+                    return current;
+            }
+        }
+
+        protected void getAndIncWaitingCount() {
+            try {
+                CU.outTx(
+                        retryTopologySafe(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null)
+                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+                                    int waiting = val.getWaiters();
+                                    sync.threadMap.put(Thread.currentThread(), waiting);
+
+                                    waiting++;
+                                    val.setWaiters(waiting);
+                                    semaphoreView.put(key, val);
+                                    tx.commit();
+
+                                    return true;
+                                } catch (Error | Exception e) {
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                    throw e;
+                                }
+                            }
+                        }),
+                        ctx
+                );
+            } catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
+            try {
+                return CU.outTx(
+                        retryTopologySafe(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null)
+                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+
+                                    boolean retVal = val.getCnt() == expVal;
+
+                                    if (retVal) {
+                                        /* If current thread is queued, than this call is the call that is going to be unblocked. */
+                                        if(sync.isQueued(Thread.currentThread())) {
+
+                                            int waiting = val.getWaiters() - 1;
+                                            val.setWaiters(waiting);
+
+                                            sync.threadMap.remove(Thread.currentThread());
+                                        }
+
+                                        val.setCnt(newVal);
+
+                                        semaphoreView.put(key, val);
+                                        tx.commit();
+                                    }
+
+                                    return retVal;
+                                } catch (Error | Exception e) {
+                                    U.error(log, "Failed to compare and set: " + this, e);
+
+                                    throw e;
+                                }
+                            }
+                        }),
+                        ctx
+                );
+            } catch( IgniteCheckedException e){
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /**
+     * NonFair version
+     */
+    final class NonfairSync extends Sync {
+        private static final long serialVersionUID = 7983135489326435495L;
+
+        NonfairSync(int permits) {
+            super(permits);
+        }
+
+        protected int tryAcquireShared(int acquires) {
+            return nonfairTryAcquireShared(acquires);
+        }
+    }
+
+    /**
+     * Fair version
+     */
+    final class FairSync extends Sync {
+        private static final long serialVersionUID = 3468129658421667L;
+
+        FairSync(int permits) {
+            super(permits);
+        }
+
+        protected int tryAcquireShared(int acquires) {
+            for (;;) {
+                if (hasQueuedPredecessors())
+                    return -1;
+
+                int available = getState();
+                int remaining = available - acquires;
+
+                if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
+                    if(remaining < 0){
+                        if(!threadMap.containsKey(Thread.currentThread()))
+                            getAndIncWaitingCount();
+                    }
+                    return remaining;
+                }
+            }
+        }
+
+
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Semaphore name.
+     * @param initCnt Initial count.
+     * @param key Semaphore key.
+     * @param semaphoreView Semaphore projection.
+     * @param ctx Cache context.
+     */
+    public GridCacheSemaphoreImpl(String name,
+                                       int initCnt,
+                                       boolean fair,
+                                       GridCacheInternalKey key,
+                                       IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+                                       GridCacheContext ctx)
+    {
+        assert name != null;
+        assert key != null;
+        assert semaphoreView != null;
+        assert ctx != null;
+
+        this.name = name;
+        this.initCnt = initCnt;
+        this.key = key;
+        this.semaphoreView = semaphoreView;
+        this.ctx = ctx;
+        this.isFair = fair;
+
+        log = ctx.logger(getClass());
+    }
+
+    /**
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private void initializeSemaphore() throws IgniteCheckedException {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                sync = CU.outTx(
+                        retryTopologySafe(new Callable<Sync>() {
+                            @Override
+                            public Sync call() throws Exception {
+                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                    if (val == null) {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Failed to find semaphore with given name: " + name);
+
+                                        return null;
+                                    }
+
+                                    final int count = val.getCnt();
+                                    tx.commit();
+
+                                    return val.isFair() ? new FairSync(count) : new NonfairSync(count);
+                                }
+                            }
+                        }),
+                        ctx
+                );
+                if (log.isDebugEnabled())
+                    log.debug("Initialized internal sync structure: " + sync);
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            U.await(initLatch);
+
+            if (sync == null)
+                throw new IgniteCheckedException("Internal latch has not been properly initialized.");
+        }
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheInternalKey key() { return key; }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed(){ return rmvd; }
+
+    /** {@inheritDoc} */
+    @Override public boolean onRemoved() {
+        return rmvd = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdate(GridCacheSemaphoreState val) {
+        if(sync == null)
+            return;
+
+        // Update permission count.
+        sync.setPermits(val.getCnt());
+
+        // Update waiters count.
+        sync.setWaiters(val.getWaiters());
+
+        // Try to notify any waiting threads.
+        sync.releaseShared(0);
+    }
+
+    @Override
+    public void needCheckNotRemoved() {
+        // No-op.
+    }
+
+    @Override
+    public void acquire() throws IgniteException {
+        acquire(1);
+    }
+
+    @Override
+    public void acquire(int permits) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.acquireSharedInterruptibly(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+
+    @Override
+    public void acquireUninterruptibly() {
+        try {
+            initializeSemaphore();
+            sync.acquireShared(1);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public void acquireUninterruptibly(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.acquireShared(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public int availablePermits(){
+        int ret;
+        try {
+            initializeSemaphore();
+            ret = CU.outTx(
+                    retryTopologySafe(new Callable<Integer>() {
+                        @Override
+                        public Integer call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteException("Failed to find semaphore with given name: " + name);
+
+                                int count = val.getCnt();
+                                tx.rollback();
+
+                                return count;
+                            }
+                        }
+                    }),
+                    ctx
+            );
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        return ret;
+    }
+
+    @Override
+    public int drainPermits() {
+        try {
+            initializeSemaphore();
+            return sync.drainPermits();
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire() {
+        try {
+            initializeSemaphore();
+            return sync.nonfairTryAcquireShared(1) >= 0;
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
+        try {
+            initializeSemaphore();
+            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    @Override
+    public void release() {
+        release(1);
+    }
+
+    @Override
+    public void release(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            sync.releaseShared(permits);
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int permits) {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            return sync.nonfairTryAcquireShared(permits) >= 0;
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        A.ensure(permits >= 0, "Number of permits must be non-negative.");
+        try {
+            initializeSemaphore();
+            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        } catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+    }
+
+    @Override
+    public boolean isFair() {
+        return false;
+    }
+
+    @Override
+    public boolean hasQueuedThreads() {
+        try {
+            initializeSemaphore();
+            return sync.getWaiters()!=0;
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public int getQueueLength() {
+        try {
+            initializeSemaphore();
+            return sync.getWaiters();
+        } catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
+    @Override
+    public void close() {
+        if (!rmvd) {
+            try {
+                ctx.kernalContext().dataStructures().removeSemaphore(name);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheSemaphoreImpl.class, this);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
new file mode 100644
index 0000000..cf44b7d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -0,0 +1,128 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+
+/**
+ * Grid cache semaphore state.
+ *
+ * @author Vladisav Jelisavcic
+ */
+public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Permission count.
+     */
+    private int cnt;
+
+    /**
+     * Waiter id.
+     */
+    private int waiters;
+
+    /**
+     * Fairness flag.
+     */
+    private boolean fair;
+
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     */
+    public GridCacheSemaphoreState(int cnt, int waiters) {
+        this.cnt = cnt;
+        this.waiters = waiters;
+        this.fair = false;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     */
+    public GridCacheSemaphoreState(int cnt, int waiters, boolean fair) {
+        this.cnt = cnt;
+        this.waiters = waiters;
+        this.fair = fair;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheSemaphoreState() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void setCnt(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int getCnt() {
+        return cnt;
+    }
+
+    public int getWaiters() {
+        return waiters;
+    }
+
+    public void setWaiters(int id) {
+        this.waiters = id;
+    }
+
+    public boolean isFair() {
+        return fair;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeInt(waiters);
+        out.writeBoolean(fair);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        waiters = in.readInt();
+        fair = in.readBoolean();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString() {
+        return S.toString(GridCacheSemaphoreState.class, this);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
new file mode 100644
index 0000000..689b647
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
@@ -0,0 +1,115 @@
+package org.apache.ignite.internal.processors.datastructures;
+
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Created by vladisav on 20.9.15..
+ */
+public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Permission count.
+     */
+    private int cnt;
+
+    /**
+     * Semaphore ID.
+     */
+    private long semaphoreId;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Number of permissions.
+     * @param
+     */
+    public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
+        this.cnt = cnt;
+
+        this.semaphoreId = semaphoreId;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheSemaphoreValue() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void set(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int get() {
+        return cnt;
+    }
+
+    /**
+     * @return true if number of permissions to be added is positive
+     */
+    public boolean isRelease(){
+        return cnt>0;
+    }
+
+    /**
+     * @return true if permission count should be lowered
+     */
+    public boolean isAwait(){
+        return cnt<0;
+    }
+
+    /**
+     * @return Semaphore ID.
+     */
+    public long semaphoreId() {
+        return semaphoreId;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeLong(semaphoreId);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        semaphoreId = in.readLong();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString() {
+        return S.toString(GridCacheSemaphoreValue.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 964753d..b633a6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCluster;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteFileSystem;
@@ -308,6 +309,15 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+        int cnt,
+        boolean fair,
+        boolean create)
+    {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index ec7dab7..495fd6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteAtomicStamped;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
@@ -536,6 +537,12 @@ public class IgniteProcessProxy implements IgniteEx {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteSemaphore semaphore(String name, int cnt, boolean fair,
+        boolean create) throws IgniteException {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> IgniteQueue<T> queue(String name, int cap,
         @Nullable CollectionConfiguration cfg) throws IgniteException {
         throw new UnsupportedOperationException("Operation isn't supported yet.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/e9567ade/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 42514e3..4867a10 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -398,6 +398,17 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteSemaphore semaphore(String name,
+        int cnt,
+        boolean fair,
+        boolean create)
+    {
+        assert g != null;
+
+        return g.semaphore(name, cnt, fair, create);
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public <T> IgniteQueue<T> queue(String name,
         int cap,
         CollectionConfiguration cfg)


Mime
View raw message