ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [2/6] ignite git commit: Fixes formatting issues;
Date Fri, 23 Oct 2015 16:10:43 GMT
Fixes formatting issues;


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be332a82
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be332a82
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be332a82

Branch: refs/heads/ignite-638
Commit: be332a82711ddd7e9088e00d7f26edd7de407a11
Parents: e9567ad
Author: Vladisav Jelisavcic <vladisavj@gmail.com>
Authored: Thu Oct 1 20:19:32 2015 +0200
Committer: vladisav <vladisavj@gmail.com>
Committed: Thu Oct 1 20:19:32 2015 +0200

----------------------------------------------------------------------
 .../datastructures/IgniteSemaphoreExample.java  |  68 ++--
 .../java/org/apache/ignite/IgniteSemaphore.java | 396 +++++++------------
 .../datastructures/GridCacheSemaphoreEx.java    |   6 +-
 .../datastructures/GridCacheSemaphoreImpl.java  | 346 +++++++++-------
 .../datastructures/GridCacheSemaphoreState.java |   9 +-
 .../datastructures/GridCacheSemaphoreValue.java | 115 ------
 6 files changed, 396 insertions(+), 544 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
index 2ef242c..5849f5f 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java
@@ -1,22 +1,20 @@
 package org.apache.ignite.examples.datastructures;
 
-import org.apache.ignite.*;
-import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.examples.ExampleNodeStartup;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.lang.IgniteRunnable;
 
 /**
- * This example demonstrates cache based semaphore.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
- * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-ignite.xml} configuration.
- *
- * @author Vladisav Jelisavcic
+ * This example demonstrates cache based semaphore. <p> Remote nodes should always be started with special configuration
+ * file which enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. <p> Alternatively
+ * you can run {@link ExampleNodeStartup} in another JVM which will start node with {@code
+ * examples/config/example-ignite.xml} configuration.
  */
 public class IgniteSemaphoreExample {
     /** Cache name. */
@@ -40,7 +38,7 @@ public class IgniteSemaphoreExample {
             System.out.println(">>> Cache atomic semaphore example started.");
 
             // Initialize semaphore.
-            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName,0,false,true);
+            IgniteSemaphore syncSemaphore = ignite.semaphore(syncName, 0, false, true);
 
             // Make name of semaphore.
             final String semaphoreName = UUID.randomUUID().toString();
@@ -50,21 +48,25 @@ public class IgniteSemaphoreExample {
 
             // Make shared resource
             final String resourceName = UUID.randomUUID().toString();
+
+            // Get cache view where the resource will be held
             IgniteCache<String, Queue<String>> cache = ignite.getOrCreateCache(CACHE_NAME);
+
+            // Put the resource queue the cache
             cache.put(resourceName, new LinkedList<>());
 
             // Initialize semaphore.
             IgniteSemaphore semaphore = ignite.semaphore(semaphoreName, 0, false, true);
 
             // Initialize mutex.
-            IgniteSemaphore mutex = ignite.semaphore(mutexName,1,false,true);
+            IgniteSemaphore mutex = ignite.semaphore(mutexName, 1, false, true);
 
             // Start consumers on all cluster nodes.
             for (int i = 0; i < NUM_CONSUMERS; i++)
                 ignite.compute().withAsync().run(new Consumer(mutexName, semaphoreName, resourceName));
 
             // Start producers on all cluster nodes.
-            for(int i = 0; i < NUM_PRODUCERS; i++)
+            for (int i = 0; i < NUM_PRODUCERS; i++)
                 ignite.compute().withAsync().run(new Producer(mutexName, semaphoreName, resourceName));
 
             System.out.println("Master node is waiting for all other nodes to finish...");
@@ -121,23 +123,33 @@ public class IgniteSemaphoreExample {
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
-            for(int i=0;i<ITEM_COUNT;i++) {
+            for (int i = 0; i < ITEM_COUNT; i++) {
+                // Mutex is used to access shared resource.
                 mutex.acquire();
 
-                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
                 queue.add(Ignition.ignite().cluster().localNode().id().toString());
+
                 Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
                 System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] produced data. Available: " + semaphore.availablePermits());
 
+                // Mutex is released for others to access the resource.
                 mutex.release();
 
+                // Signals others that shared resource is available.
                 semaphore.release();
             }
 
             System.out.println("Producer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
-            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 0, true, true);
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 0, true, true);
+
+            // Signals the master thread
             sync.release();
         }
     }
@@ -159,23 +171,33 @@ public class IgniteSemaphoreExample {
         /** {@inheritDoc} */
         @Override public void run() {
             IgniteSemaphore semaphore = Ignition.ignite().semaphore(semaphoreName, 0, true, true);
-            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName,0,true,true);
+            IgniteSemaphore mutex = Ignition.ignite().semaphore(mutexName, 0, true, true);
 
-            for(int i=0;i<ITEM_COUNT;i++) {
+            for (int i = 0; i < ITEM_COUNT; i++) {
+                // Block if queue is empty.
                 semaphore.acquire();
 
+                // Mutex is used to access shared resource.
                 mutex.acquire();
 
-                Queue<String> queue = (Queue<String>) Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+                Queue<String> queue = (Queue<String>)Ignition.ignite().cache(CACHE_NAME).get(resourceName);
+
                 String data = queue.remove();
+
                 Ignition.ignite().cache(CACHE_NAME).put(resourceName, queue);
+
                 System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] consumed data generated by producer [nodeId=" + data + "]");
 
+                // Signals others that shared resource is available.
                 mutex.release();
             }
 
             System.out.println("Consumer [nodeId=" + Ignition.ignite().cluster().localNode().id() + "] finished. ");
-            IgniteSemaphore sync =  Ignition.ignite().semaphore(syncName, 3, true, true);
+
+            // Gets the syncing semaphore
+            IgniteSemaphore sync = Ignition.ignite().semaphore(syncName, 3, true, true);
+
+            // Signals the master thread
             sync.release();
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index 0e29d00..5a4b377 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -4,17 +4,12 @@ import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This interface provides a rich API for working with distributed semaphore.
- * <p>
- * <h1 class="header">Functionality</h1>
- * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}.
- * <h1 class="header">Creating Distributed Semaphore</h1>
- * Instance of cache semaphore can be created by calling the following method:
- * {@link Ignite#semaphore(String, int, boolean, boolean)}.
- *
- * @author Vladisav Jelisavcic
+ * This interface provides a rich API for working with distributed semaphore. <p> <h1 class="header">Functionality</h1>
+ * Distributed semaphore provides functionality similar to {@code java.util.concurrent.Semaphore}. <h1
+ * class="header">Creating Distributed Semaphore</h1> Instance of cache semaphore can be created by calling the
+ * following method: {@link Ignite#semaphore(String, int, boolean, boolean)}.
  */
-public interface IgniteSemaphore extends Closeable{
+public interface IgniteSemaphore extends Closeable {
     /**
      * Gets name of the semaphore.
      *
@@ -23,151 +18,110 @@ public interface IgniteSemaphore extends Closeable{
     public String name();
 
     /**
-     * Acquires a permit from this semaphore, blocking until one is
-     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of two things happens:
-     * <ul>
-     * <li>Some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread.
-     * </ul>
-     *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * for a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
+     * Acquires a permit from this semaphore, blocking until one is available, or the thread is {@linkplain
+     * Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of two things happens: <ul> <li>Some other thread invokes the {@link #release} method for this
+     * semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared.
      *
      * @throws IgniteInterruptedException if the current thread is interrupted
      */
     public void acquire() throws IgniteInterruptedException;
 
     /**
-     * Acquires a permit from this semaphore, blocking until one is
-     * available.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit.
-     *
-     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
-     * while waiting for a permit then it will continue to wait, but the
-     * time at which the thread is assigned a permit may change compared to
-     * the time it would have received the permit had no interruption
-     * occurred.  When the thread does return from this method its interrupt
-     * status will be set.
+     * Acquires a permit from this semaphore, blocking until one is available.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, reducing the number of available permits by
+     * one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until some other thread invokes the {@link #release} method for this semaphore and the current thread is
+     * next to be assigned a permit.
+     *
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for a permit then it will
+     * continue to wait, but the time at which the thread is assigned a permit may change compared to the time it would
+     * have received the permit had no interruption occurred.  When the thread does return from this method its
+     * interrupt status will be set.
      */
     public void acquireUninterruptibly();
 
     /**
-     * Acquires a permit from this semaphore, only if one is available at the
-     * time of invocation.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * with the value {@code true},
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then this method will return
-     * immediately with the value {@code false}.
-     *
-     * <p>Even when this semaphore has been set to use a
-     * fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
-     * immediately acquire a permit if one is available, whether or not
-     * other threads are currently waiting.
-     * This &quot;barging&quot; behavior can be useful in certain
-     * circumstances, even though it breaks fairness. If you want to honor
-     * the fairness setting, then use
-     * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
-     * which is almost equivalent (it also detects interruption).
-     *
-     * @return {@code true} if a permit was acquired and {@code false}
-     *         otherwise
+     * Acquires a permit from this semaphore, only if one is available at the time of invocation.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then this method will return immediately with the value {@code false}.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire()}
+     * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+     * waiting. This &quot;barging&quot; behavior can be useful in certain circumstances, even though it breaks
+     * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(long, TimeUnit) tryAcquire(0,
+     * TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
+     *
+     * @return {@code true} if a permit was acquired and {@code false} otherwise
      */
     public boolean tryAcquire();
 
     /**
-     * Acquires a permit from this semaphore, if one becomes available
-     * within the given waiting time and the current thread has not
-     * been {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires a permit, if one is available and returns immediately,
-     * with the value {@code true},
-     * reducing the number of available permits by one.
-     *
-     * <p>If no permit is available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of three things happens:
-     * <ul>
-     * <li>Some other thread invokes the {@link #release} method for this
-     * semaphore and the current thread is next to be assigned a permit; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread; or
-     * <li>The specified waiting time elapses.
-     * </ul>
+     * Acquires a permit from this semaphore, if one becomes available within the given waiting time and the current
+     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires a permit, if one is available and returns immediately, with the value {@code true}, reducing the
+     * number of available permits by one.
+     *
+     * <p>If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happens: <ul> <li>Some other thread invokes the {@link #release} method for
+     * this semaphore and the current thread is next to be assigned a permit; or <li>Some other thread {@linkplain
+     * Thread#interrupt interrupts} the current thread; or <li>The specified waiting time elapses. </ul>
      *
      * <p>If a permit is acquired then the value {@code true} is returned.
      *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * to acquire a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire a permit, </ul> then {@link IgniteInterruptedException} is
+     * thrown and the current thread's interrupted status is cleared.
      *
-     * <p>If the specified waiting time elapses then the value {@code false}
-     * is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.
      *
      * @param timeout the maximum time to wait for a permit
      * @param unit the time unit of the {@code timeout} argument
-     * @return {@code true} if a permit was acquired and {@code false}
-     *         if the waiting time elapsed before a permit was acquired
+     * @return {@code true} if a permit was acquired and {@code false} if the waiting time elapsed before a permit was
+     * acquired
      * @throws IgniteInterruptedException if the current thread is interrupted
      */
     public boolean tryAcquire(long timeout, TimeUnit unit)
-            throws IgniteInterruptedException;
+        throws IgniteInterruptedException;
 
     /**
-     * Acquires the given number of permits from this semaphore,
-     * blocking until all are available.
+     * Acquires the given number of permits from this semaphore, blocking until all are available.
      *
-     * <p>Acquires the given number of permits, if they are available,
-     * and returns immediately, reducing the number of available permits
-     * by the given amount.
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
      *
-     * <p>If insufficient permits are available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request.
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until some other thread invokes one of the {@link #release() release} methods for this
+     * semaphore, the current thread is next to be assigned permits and the number of available permits satisfies this
+     * request.
      *
-     * <p>If the current thread is {@linkplain Thread#interrupt interrupted}
-     * while waiting for permits then it will continue to wait and its
-     * position in the queue is not affected.  When the thread does return
-     * from this method its interrupt status will be set.
+     * <p>If the current thread is {@linkplain Thread#interrupt interrupted} while waiting for permits then it will
+     * continue to wait and its position in the queue is not affected.  When the thread does return from this method its
+     * interrupt status will be set.
      *
      * @param permits the number of permits to acquire
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public void acquireUninterruptibly(int permits);
 
-
     /**
      * Returns the current number of permits available in this semaphore.
      *
@@ -187,131 +141,91 @@ public interface IgniteSemaphore extends Closeable{
     /**
      * Releases a permit, returning it to the semaphore.
      *
-     * <p>Releases a permit, increasing the number of available permits by
-     * one.  If any threads are trying to acquire a permit, then one is
-     * selected and given the permit that was just released.  That thread
-     * is (re)enabled for thread scheduling purposes.
+     * <p>Releases a permit, increasing the number of available permits by one.  If any threads are trying to acquire a
+     * permit, then one is selected and given the permit that was just released.  That thread is (re)enabled for thread
+     * scheduling purposes.
      *
-     * <p>There is no requirement that a thread that releases a permit must
-     * have acquired that permit by calling {@link #acquire}.
-     * Correct usage of a semaphore is established by programming convention
-     * in the application.
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * #acquire}. Correct usage of a semaphore is established by programming convention in the application.
      */
     public void release();
 
     /**
-     * Acquires the given number of permits from this semaphore, if all
-     * become available within the given waiting time and the current
-     * thread has not been {@linkplain Thread#interrupt interrupted}.
+     * Acquires the given number of permits from this semaphore, if all become available within the given waiting time
+     * and the current thread has not been {@linkplain Thread#interrupt interrupted}.
      *
-     * <p>Acquires the given number of permits, if they are available and
-     * returns immediately, with the value {@code true},
-     * reducing the number of available permits by the given amount.
-     *
-     * <p>If insufficient permits are available then
-     * the current thread becomes disabled for thread scheduling
-     * purposes and lies dormant until one of three things happens:
-     * <ul>
-     * <li>Some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread; or
-     * <li>The specified waiting time elapses.
-     * </ul>
+     * <p>Acquires the given number of permits, if they are available and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of three things happens: <ul> <li>Some other thread invokes one of the {@link
+     * #release() release} methods for this semaphore, the current thread is next to be assigned permits and the number
+     * of available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or <li>The specified waiting time elapses. </ul>
      *
      * <p>If the permits are acquired then the value {@code true} is returned.
      *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * to acquire the permits,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * Any permits that were to be assigned to this thread, are instead
-     * assigned to other threads trying to acquire permits, as if
-     * the permits had been made available by a call to {@link #release()}.
-     *
-     * <p>If the specified waiting time elapses then the value {@code false}
-     * is returned.  If the time is less than or equal to zero, the method
-     * will not wait at all.  Any permits that were to be assigned to this
-     * thread, are instead assigned to other threads trying to acquire
-     * permits, as if the permits had been made available by a call to
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting to acquire the permits, </ul> then {@link IgniteInterruptedException}
+     * is thrown and the current thread's interrupted status is cleared. Any permits that were to be assigned to this
+     * thread, are instead assigned to other threads trying to acquire permits, as if the permits had been made
+     * available by a call to {@link #release()}.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false} is returned.  If the time is less than or
+     * equal to zero, the method will not wait at all.  Any permits that were to be assigned to this thread, are instead
+     * assigned to other threads trying to acquire permits, as if the permits had been made available by a call to
      * {@link #release()}.
      *
      * @param permits the number of permits to acquire
      * @param timeout the maximum time to wait for the permits
      * @param unit the time unit of the {@code timeout} argument
-     * @return {@code true} if all permits were acquired and {@code false}
-     *         if the waiting time elapsed before all permits were acquired
+     * @return {@code true} if all permits were acquired and {@code false} if the waiting time elapsed before all
+     * permits were acquired
      * @throws IgniteInterruptedException if the current thread is interrupted
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
-            throws IgniteInterruptedException;
+        throws IgniteInterruptedException;
 
     /**
-     * Acquires the given number of permits from this semaphore, only
-     * if all are available at the time of invocation.
-     *
-     * <p>Acquires the given number of permits, if they are available, and
-     * returns immediately, with the value {@code true},
-     * reducing the number of available permits by the given amount.
-     *
-     * <p>If insufficient permits are available then this method will return
-     * immediately with the value {@code false} and the number of available
-     * permits is unchanged.
-     *
-     * <p>Even when this semaphore has been set to use a fair ordering
-     * policy, a call to {@code tryAcquire} <em>will</em>
-     * immediately acquire a permit if one is available, whether or
-     * not other threads are currently waiting.  This
-     * &quot;barging&quot; behavior can be useful in certain
-     * circumstances, even though it breaks fairness. If you want to
-     * honor the fairness setting, then use {@link #tryAcquire(int,
-     * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
-     * which is almost equivalent (it also detects interruption).
+     * Acquires the given number of permits from this semaphore, only if all are available at the time of invocation.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, with the value {@code
+     * true}, reducing the number of available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then this method will return immediately with the value {@code false}
+     * and the number of available permits is unchanged.
+     *
+     * <p>Even when this semaphore has been set to use a fair ordering policy, a call to {@code tryAcquire}
+     * <em>will</em> immediately acquire a permit if one is available, whether or not other threads are currently
+     * waiting.  This &quot;barging&quot; behavior can be useful in certain circumstances, even though it breaks
+     * fairness. If you want to honor the fairness setting, then use {@link #tryAcquire(int, long, TimeUnit)
+     * tryAcquire(permits, 0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
      *
      * @param permits the number of permits to acquire
-     * @return {@code true} if the permits were acquired and
-     *         {@code false} otherwise
+     * @return {@code true} if the permits were acquired and {@code false} otherwise
      * @throws IllegalArgumentException if {@code permits} is negative
      */
     public boolean tryAcquire(int permits);
 
     /**
-     * Acquires the given number of permits from this semaphore,
-     * blocking until all are available,
-     * or the thread is {@linkplain Thread#interrupt interrupted}.
-     *
-     * <p>Acquires the given number of permits, if they are available,
-     * and returns immediately, reducing the number of available permits
-     * by the given amount.
-     *
-     * <p>If insufficient permits are available then the current thread becomes
-     * disabled for thread scheduling purposes and lies dormant until
-     * one of two things happens:
-     * <ul>
-     * <li>Some other thread invokes one of the {@link #release() release}
-     * methods for this semaphore, the current thread is next to be assigned
-     * permits and the number of available permits satisfies this request; or
-     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
-     * the current thread.
-     * </ul>
-     *
-     * <p>If the current thread:
-     * <ul>
-     * <li>has its interrupted status set on entry to this method; or
-     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
-     * for a permit,
-     * </ul>
-     * then {@link IgniteInterruptedException} is thrown and the current thread's
-     * interrupted status is cleared.
-     * Any permits that were to be assigned to this thread are instead
-     * assigned to other threads trying to acquire permits, as if
-     * permits had been made available by a call to {@link #release()}.
+     * Acquires the given number of permits from this semaphore, blocking until all are available, or the thread is
+     * {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>Acquires the given number of permits, if they are available, and returns immediately, reducing the number of
+     * available permits by the given amount.
+     *
+     * <p>If insufficient permits are available then the current thread becomes disabled for thread scheduling purposes
+     * and lies dormant until one of two things happens: <ul> <li>Some other thread invokes one of the {@link #release()
+     * release} methods for this semaphore, the current thread is next to be assigned permits and the number of
+     * available permits satisfies this request; or <li>Some other thread {@linkplain Thread#interrupt interrupts} the
+     * current thread. </ul>
+     *
+     * <p>If the current thread: <ul> <li>has its interrupted status set on entry to this method; or <li>is {@linkplain
+     * Thread#interrupt interrupted} while waiting for a permit, </ul> then {@link IgniteInterruptedException} is thrown
+     * and the current thread's interrupted status is cleared. Any permits that were to be assigned to this thread are
+     * instead assigned to other threads trying to acquire permits, as if permits had been made available by a call to
+     * {@link #release()}.
      *
      * @param permits the number of permits to acquire
      * @throws IgniteInterruptedException if the current thread is interrupted
@@ -322,21 +236,16 @@ public interface IgniteSemaphore extends Closeable{
     /**
      * Releases the given number of permits, returning them to the semaphore.
      *
-     * <p>Releases the given number of permits, increasing the number of
-     * available permits by that amount.
-     * If any threads are trying to acquire permits, then one
-     * is selected and given the permits that were just released.
-     * If the number of available permits satisfies that thread's request
-     * then that thread is (re)enabled for thread scheduling purposes;
-     * otherwise the thread will wait until sufficient permits are available.
-     * If there are still permits available
-     * after this thread's request has been satisfied, then those permits
-     * are assigned in turn to other threads trying to acquire permits.
-     *
-     * <p>There is no requirement that a thread that releases a permit must
-     * have acquired that permit by calling {@link IgniteSemaphore#acquire acquire}.
-     * Correct usage of a semaphore is established by programming convention
-     * in the application.
+     * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
+     * threads are trying to acquire permits, then one is selected and given the permits that were just released. If the
+     * number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling
+     * purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits
+     * available after this thread's request has been satisfied, then those permits are assigned in turn to other
+     * threads trying to acquire permits.
+     *
+     * <p>There is no requirement that a thread that releases a permit must have acquired that permit by calling {@link
+     * IgniteSemaphore#acquire acquire}. Correct usage of a semaphore is established by programming convention in the
+     * application.
      *
      * @param permits the number of permits to release
      * @throws IllegalArgumentException if {@code permits} is negative
@@ -351,23 +260,18 @@ public interface IgniteSemaphore extends Closeable{
     public boolean isFair();
 
     /**
-     * Queries whether any threads are waiting to acquire. Note that
-     * because cancellations may occur at any time, a {@code true}
-     * return does not guarantee that any other thread will ever
-     * acquire.  This method is designed primarily for use in
-     * monitoring of the system state.
-     *
-     * @return {@code true} if there may be other threads waiting to
-     *         acquire the lock
+     * Queries whether any threads are waiting to acquire. Note that because cancellations may occur at any time, a
+     * {@code true} return does not guarantee that any other thread will ever acquire.  This method is designed
+     * primarily for use in monitoring of the system state.
+     *
+     * @return {@code true} if there may be other threads waiting to acquire the lock
      */
     public boolean hasQueuedThreads();
 
     /**
-     * Returns an estimate of the number of threads waiting to acquire.
-     * The value is only an estimate because the number of threads may
-     * change dynamically while this method traverses internal data
-     * structures.  This method is designed for use in monitoring of the
-     * system state, not for synchronization control.
+     * Returns an estimate of the number of threads waiting to acquire. The value is only an estimate because the number
+     * of threads may change dynamically while this method traverses internal data structures.  This method is designed
+     * for use in monitoring of the system state, not for synchronization control.
      *
      * @return the estimated number of threads waiting for this lock
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
index 0f939d5..8ecbcc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreEx.java
@@ -3,7 +3,7 @@ package org.apache.ignite.internal.processors.datastructures;
 import org.apache.ignite.IgniteSemaphore;
 
 /**
- * Created by vladisav on 20.9.15..
+ * Grid cache semaphore ({@code 'Ex'} stands for external).
  */
 public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovable {
     /**
@@ -16,7 +16,7 @@ public interface GridCacheSemaphoreEx extends IgniteSemaphore, GridCacheRemovabl
     /**
      * Callback to notify semaphore on changes.
      *
-     * @param val Id of the caller and number of permissions to acquire (or release; can be negative).
+     * @param val State containing the number of available permissions.
      */
     public void onUpdate(GridCacheSemaphoreState val);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 24c3ec5..17efc61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -1,6 +1,20 @@
 package org.apache.ignite.internal.processors.datastructures;
 
-import org.apache.ignite.*;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -11,26 +25,16 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
- * Cache semaphore implementation based on AbstractQueuedSynchronizer.
- * Current implementation supports only unfair and locally fair modes.
- * When fairness set false, this class makes no guarantees about the order in which threads acquire permits.
- * When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire methods
- * are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
- *
- * @author Vladisav Jelisavcic
+ * Cache semaphore implementation based on AbstractQueuedSynchronizer. Current implementation supports only unfair and
+ * locally fair modes. When fairness set false, this class makes no guarantees about the order in which threads acquire
+ * permits. When fairness is set true, the semaphore only guarantees that local threads invoking any of the acquire
+ * methods are selected to obtain permits in the order in which their invocation of those methods was processed (FIFO).
  */
 public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Externalizable {
     /** */
@@ -38,11 +42,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
-            new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
-                @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
-                    return F.t2();
-                }
-            };
+        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                return F.t2();
+            }
+        };
 
     /** Logger. */
     private IgniteLogger log;
@@ -50,7 +54,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     /** Semaphore name. */
     private String name;
 
-    /** Removed flag.*/
+    /** Removed flag. */
     private volatile boolean rmvd;
 
     /** Semaphore key. */
@@ -86,14 +90,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     }
 
     /**
-     * Synchronization implementation for semaphore.  Uses AQS state
-     * to represent permits. Subclassed into fair and nonfair
-     * versions.
+     * Synchronization implementation for semaphore.  Uses AQS state to represent permits. Subclassed into fair and
+     * nonfair versions.
      */
     abstract class Sync extends AbstractQueuedSynchronizer {
         private static final long serialVersionUID = 1192457210091910933L;
 
-        protected ConcurrentMap<Thread,Integer> threadMap;
+        protected final ConcurrentMap<Thread, Integer> threadMap;
         protected int totalWaiters;
 
         Sync(int permits) {
@@ -101,7 +104,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             threadMap = new ConcurrentHashMap<>();
         }
 
-        protected synchronized void setWaiters(int waiters){
+        protected synchronized void setWaiters(int waiters) {
             totalWaiters = waiters;
         }
 
@@ -109,7 +112,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             return totalWaiters;
         }
 
-        final synchronized void setPermits(int permits){
+        final synchronized void setPermits(int permits) {
             setState(permits);
         }
 
@@ -118,13 +121,14 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
 
         final int nonfairTryAcquireShared(int acquires) {
-            for (;;) {
+            for (; ; ) {
                 int available = getState();
+
                 int remaining = available - acquires;
 
                 if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
-                    if(remaining < 0){
-                        if(!threadMap.containsKey(Thread.currentThread()))
+                    if (remaining < 0) {
+                        if (!threadMap.containsKey(Thread.currentThread()))
                             getAndIncWaitingCount();
                     }
 
@@ -136,33 +140,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected final boolean tryReleaseShared(int releases) {
             // Check if some other node updated the state.
             // This method is called with release==0 only when trying to wake through update.
-            if(releases == 0)
+            if (releases == 0)
                 return true;
 
-            for (;;) {
+            for (; ; ) {
                 int current = getState();
+
                 int next = current + releases;
+
                 if (next < current) // overflow
                     throw new Error("Maximum permit count exceeded");
+
                 if (compareAndSetGlobalState(current, next))
                     return true;
             }
         }
 
         final void reducePermits(int reductions) {
-            for (;;) {
+            for (; ; ) {
                 int current = getState();
+
                 int next = current - reductions;
+
                 if (next > current) // underflow
                     throw new Error("Permit count underflow");
+
                 if (compareAndSetGlobalState(current, next))
                     return;
             }
         }
 
         final int drainPermits() {
-            for (;;) {
+            for (; ; ) {
+
                 int current = getState();
+
                 if (current == 0 || compareAndSetGlobalState(current, 0))
                     return current;
             }
@@ -171,34 +183,40 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected void getAndIncWaitingCount() {
             try {
                 CU.outTx(
-                        retryTopologySafe(new Callable<Boolean>() {
-                            @Override
-                            public Boolean call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null)
-                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
 
-                                    int waiting = val.getWaiters();
-                                    sync.threadMap.put(Thread.currentThread(), waiting);
+                                int waiting = val.getWaiters();
 
-                                    waiting++;
-                                    val.setWaiters(waiting);
-                                    semaphoreView.put(key, val);
-                                    tx.commit();
+                                sync.threadMap.put(Thread.currentThread(), waiting);
 
-                                    return true;
-                                } catch (Error | Exception e) {
-                                    U.error(log, "Failed to compare and set: " + this, e);
+                                waiting++;
 
-                                    throw e;
-                                }
+                                val.setWaiters(waiting);
+
+                                semaphoreView.put(key, val);
+
+                                tx.commit();
+
+                                return true;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
-            } catch (IgniteCheckedException e) {
+            }
+            catch (IgniteCheckedException e) {
                 throw U.convertException(e);
             }
         }
@@ -206,44 +224,48 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         protected boolean compareAndSetGlobalState(final int expVal, final int newVal) {
             try {
                 return CU.outTx(
-                        retryTopologySafe(new Callable<Boolean>() {
-                            @Override
-                            public Boolean call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override
+                        public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null)
-                                        throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find semaphore with given name: " + name);
 
-                                    boolean retVal = val.getCnt() == expVal;
+                                boolean retVal = val.getCnt() == expVal;
 
-                                    if (retVal) {
+                                if (retVal) {
                                         /* If current thread is queued, than this call is the call that is going to be unblocked. */
-                                        if(sync.isQueued(Thread.currentThread())) {
-
-                                            int waiting = val.getWaiters() - 1;
-                                            val.setWaiters(waiting);
+                                    if (sync.isQueued(Thread.currentThread())) {
 
-                                            sync.threadMap.remove(Thread.currentThread());
-                                        }
+                                        int waiting = val.getWaiters() - 1;
 
-                                        val.setCnt(newVal);
+                                        val.setWaiters(waiting);
 
-                                        semaphoreView.put(key, val);
-                                        tx.commit();
+                                        sync.threadMap.remove(Thread.currentThread());
                                     }
 
-                                    return retVal;
-                                } catch (Error | Exception e) {
-                                    U.error(log, "Failed to compare and set: " + this, e);
+                                    val.setCnt(newVal);
+
+                                    semaphoreView.put(key, val);
 
-                                    throw e;
+                                    tx.commit();
                                 }
+
+                                return retVal;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
-            } catch( IgniteCheckedException e){
+            }
+            catch (IgniteCheckedException e) {
                 throw U.convertException(e);
             }
         }
@@ -275,16 +297,17 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
 
         protected int tryAcquireShared(int acquires) {
-            for (;;) {
+            for (; ; ) {
                 if (hasQueuedPredecessors())
                     return -1;
 
                 int available = getState();
+
                 int remaining = available - acquires;
 
                 if (remaining < 0 || compareAndSetGlobalState(available, remaining)) {
-                    if(remaining < 0){
-                        if(!threadMap.containsKey(Thread.currentThread()))
+                    if (remaining < 0) {
+                        if (!threadMap.containsKey(Thread.currentThread()))
                             getAndIncWaitingCount();
                     }
                     return remaining;
@@ -292,7 +315,6 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
             }
         }
 
-
     }
 
     /**
@@ -305,12 +327,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
      * @param ctx Cache context.
      */
     public GridCacheSemaphoreImpl(String name,
-                                       int initCnt,
-                                       boolean fair,
-                                       GridCacheInternalKey key,
-                                       IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
-                                       GridCacheContext ctx)
-    {
+        int initCnt,
+        boolean fair,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheSemaphoreState> semaphoreView,
+        GridCacheContext ctx) {
         assert name != null;
         assert key != null;
         assert semaphoreView != null;
@@ -333,28 +354,30 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         if (initGuard.compareAndSet(false, true)) {
             try {
                 sync = CU.outTx(
-                        retryTopologySafe(new Callable<Sync>() {
-                            @Override
-                            public Sync call() throws Exception {
-                                try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                    GridCacheSemaphoreState val = semaphoreView.get(key);
+                    retryTopologySafe(new Callable<Sync>() {
+                        @Override
+                        public Sync call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                    if (val == null) {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Failed to find semaphore with given name: " + name);
+                                if (val == null) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find semaphore with given name: " + name);
 
-                                        return null;
-                                    }
+                                    return null;
+                                }
 
-                                    final int count = val.getCnt();
-                                    tx.commit();
+                                final int count = val.getCnt();
 
-                                    return val.isFair() ? new FairSync(count) : new NonfairSync(count);
-                                }
+                                tx.commit();
+
+                                return val.isFair() ? new FairSync(count) : new NonfairSync(count);
                             }
-                        }),
-                        ctx
+                        }
+                    }),
+                    ctx
                 );
+
                 if (log.isDebugEnabled())
                     log.debug("Initialized internal sync structure: " + sync);
             }
@@ -370,17 +393,20 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         }
     }
 
-
     /** {@inheritDoc} */
     @Override public String name() {
         return name;
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheInternalKey key() { return key; }
+    @Override public GridCacheInternalKey key() {
+        return key;
+    }
 
     /** {@inheritDoc} */
-    @Override public boolean removed(){ return rmvd; }
+    @Override public boolean removed() {
+        return rmvd;
+    }
 
     /** {@inheritDoc} */
     @Override public boolean onRemoved() {
@@ -389,7 +415,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
 
     /** {@inheritDoc} */
     @Override public void onUpdate(GridCacheSemaphoreState val) {
-        if(sync == null)
+        if (sync == null)
             return;
 
         // Update permission count.
@@ -415,25 +441,28 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public void acquire(int permits) throws IgniteInterruptedException {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            sync.acquireSharedInterruptibly(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireSharedInterruptibly(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
 
-
     @Override
     public void acquireUninterruptibly() {
         try {
             initializeSemaphore();
-            sync.acquireShared(1);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireShared(1);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -443,38 +472,41 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
-            sync.acquireShared(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.acquireShared(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
 
     @Override
-    public int availablePermits(){
+    public int availablePermits() {
         int ret;
         try {
             initializeSemaphore();
             ret = CU.outTx(
-                    retryTopologySafe(new Callable<Integer>() {
-                        @Override
-                        public Integer call() throws Exception {
-                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
-                                GridCacheSemaphoreState val = semaphoreView.get(key);
+                retryTopologySafe(new Callable<Integer>() {
+                    @Override
+                    public Integer call() throws Exception {
+                        try (IgniteInternalTx tx = CU.txStartInternal(ctx, semaphoreView, PESSIMISTIC, REPEATABLE_READ)) {
+                            GridCacheSemaphoreState val = semaphoreView.get(key);
 
-                                if (val == null)
-                                    throw new IgniteException("Failed to find semaphore with given name: " + name);
+                            if (val == null)
+                                throw new IgniteException("Failed to find semaphore with given name: " + name);
 
-                                int count = val.getCnt();
-                                tx.rollback();
+                            int count = val.getCnt();
 
-                                return count;
-                            }
+                            tx.rollback();
+
+                            return count;
                         }
-                    }),
-                    ctx
+                    }
+                }),
+                ctx
             );
-        } catch (IgniteCheckedException e) {
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
         return ret;
@@ -484,9 +516,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public int drainPermits() {
         try {
             initializeSemaphore();
-            return sync.drainPermits();
 
-        } catch (IgniteCheckedException e) {
+            return sync.drainPermits();
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -495,9 +528,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean tryAcquire() {
         try {
             initializeSemaphore();
-            return sync.nonfairTryAcquireShared(1) >= 0;
 
-        } catch (IgniteCheckedException e) {
+            return sync.nonfairTryAcquireShared(1) >= 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -506,11 +540,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean tryAcquire(long timeout, TimeUnit unit) throws IgniteException {
         try {
             initializeSemaphore();
-            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
 
-        } catch (IgniteCheckedException e) {
+            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
@@ -523,11 +559,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public void release(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            sync.releaseShared(permits);
 
-        } catch (IgniteCheckedException e) {
+            sync.releaseShared(permits);
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -535,11 +573,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     @Override
     public boolean tryAcquire(int permits) {
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
+
         try {
             initializeSemaphore();
-            return sync.nonfairTryAcquireShared(permits) >= 0;
 
-        } catch (IgniteCheckedException e) {
+            return sync.nonfairTryAcquireShared(permits) >= 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -549,11 +589,13 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
         A.ensure(permits >= 0, "Number of permits must be non-negative.");
         try {
             initializeSemaphore();
-            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
 
-        } catch (IgniteCheckedException e) {
+            return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
-        } catch (InterruptedException e) {
+        }
+        catch (InterruptedException e) {
             throw new IgniteInterruptedException(e);
         }
     }
@@ -567,8 +609,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public boolean hasQueuedThreads() {
         try {
             initializeSemaphore();
-            return sync.getWaiters()!=0;
-        } catch (IgniteCheckedException e) {
+
+            return sync.getWaiters() != 0;
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }
@@ -577,8 +621,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter
     public int getQueueLength() {
         try {
             initializeSemaphore();
+
             return sync.getWaiters();
-        } catch (IgniteCheckedException e) {
+        }
+        catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
index cf44b7d..a02b7c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreState.java
@@ -1,18 +1,14 @@
 package org.apache.ignite.internal.processors.datastructures;
 
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Grid cache semaphore state.
- *
- * @author Vladisav Jelisavcic
  */
 public class GridCacheSemaphoreState implements GridCacheInternal, Externalizable, Cloneable {
     /** */
@@ -33,7 +29,6 @@ public class GridCacheSemaphoreState implements GridCacheInternal, Externalizabl
      */
     private boolean fair;
 
-
     /**
      * Constructor.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/be332a82/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
deleted file mode 100644
index 689b647..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreValue.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.ignite.internal.processors.datastructures;
-
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-
-/**
- * Created by vladisav on 20.9.15..
- */
-public class GridCacheSemaphoreValue implements GridCacheInternal, Externalizable, Cloneable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * Permission count.
-     */
-    private int cnt;
-
-    /**
-     * Semaphore ID.
-     */
-    private long semaphoreId;
-
-    /**
-     * Constructor.
-     *
-     * @param cnt Number of permissions.
-     * @param
-     */
-    public GridCacheSemaphoreValue(int cnt, long semaphoreId) {
-        this.cnt = cnt;
-
-        this.semaphoreId = semaphoreId;
-    }
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridCacheSemaphoreValue() {
-        // No-op.
-    }
-
-    /**
-     * @param cnt New count.
-     */
-    public void set(int cnt) {
-        this.cnt = cnt;
-    }
-
-    /**
-     * @return Current count.
-     */
-    public int get() {
-        return cnt;
-    }
-
-    /**
-     * @return true if number of permissions to be added is positive
-     */
-    public boolean isRelease(){
-        return cnt>0;
-    }
-
-    /**
-     * @return true if permission count should be lowered
-     */
-    public boolean isAwait(){
-        return cnt<0;
-    }
-
-    /**
-     * @return Semaphore ID.
-     */
-    public long semaphoreId() {
-        return semaphoreId;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Object clone() throws CloneNotSupportedException {
-        return super.clone();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(cnt);
-        out.writeLong(semaphoreId);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void readExternal(ObjectInput in) throws IOException {
-        cnt = in.readInt();
-        semaphoreId = in.readLong();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public String toString() {
-        return S.toString(GridCacheSemaphoreValue.class, this);
-    }
-}
\ No newline at end of file


Mime
View raw message