Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DD3A7182F2 for ; Fri, 20 Nov 2015 14:39:57 +0000 (UTC) Received: (qmail 87095 invoked by uid 500); 20 Nov 2015 14:39:57 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 87054 invoked by uid 500); 20 Nov 2015 14:39:57 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 87041 invoked by uid 99); 20 Nov 2015 14:39:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 14:39:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D255E01BA; Fri, 20 Nov 2015 14:39:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dmagda@apache.org To: commits@ignite.apache.org Date: Fri, 20 Nov 2015 14:39:57 -0000 Message-Id: <8e9c55b2ee4c42108345cae82d821666@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: ignite-638: Implement IgniteSemaphore data structure Repository: ignite Updated Branches: refs/heads/ignite-1.5 900788b68 -> 8e7e33090 http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 2fd40f6..85a26ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -27,7 +27,9 @@ import org.apache.ignite.IgniteAtomicReference; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.configuration.AtomicConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -514,6 +516,277 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig /** * @throws Exception If failed. */ + public void testSemaphoreTopologyChange() throws Exception { + + try (IgniteSemaphore semaphore = grid(0).semaphore(STRUCTURE_NAME, 20, true, true)) { + try { + Ignite g = startGrid(NEW_GRID_NAME); + + assert g.semaphore(STRUCTURE_NAME, 20, true, true).availablePermits() == 20; + + g.semaphore(STRUCTURE_NAME, 20, true, true).acquire(10); + + stopGrid(NEW_GRID_NAME); + + assert grid(0).semaphore(STRUCTURE_NAME, 20, true, true).availablePermits() == 10; + } + finally { + grid(0).semaphore(STRUCTURE_NAME, 20, true, true).close(); + } + } + + } + + /** + * @throws Exception If failed. + */ + public void testSemaphoreConstantTopologyChange() throws Exception { + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 10, false, true)) { + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + assert g.semaphore(STRUCTURE_NAME, 10, false, false) != null; + } + finally { + if (i != TOP_CHANGE_CNT - 1) + stopGrid(name); + } + } + } + catch (Exception e) { + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + int val = s.availablePermits(); + + while (!fut.isDone()) { + assert s.availablePermits() == val; + + s.acquire(); + + assert s.availablePermits() == val - 1; + + s.release(); + } + + fut.get(); + + for (Ignite g : G.allGrids()) + assert g.semaphore(STRUCTURE_NAME, 0, false, true).availablePermits() == val; + } + finally { + grid(0).semaphore(STRUCTURE_NAME, 0, false, true).close(); + } + } + } + + /** + * This method tests if permits are successfully reassigned when a node fails in failoverSafe mode. + * + * @throws Exception If failed. + */ + public void testSemaphoreConstantTopologyChangeFailoverSafe() throws Exception { + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true)) { + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); + + assertNotNull(sem); + + sem.acquire(); + + if (i == TOP_CHANGE_CNT - 1) { + sem.release(); + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) { + stopGrid(name); + } + } + } + } + catch (Exception e) { + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + while (!fut.isDone()) { + s.release(); + + s.acquire(); + } + + fut.get(); + + int val = s.availablePermits(); + + assertEquals(val, TOP_CHANGE_CNT); + + for (Ignite g : G.allGrids()) + assertEquals(val, g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).availablePermits()); + } + finally { + grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).close(); + } + } + } + + /** + * This method tests if permits are successfully reassigned when multiple nodes fail in failoverSafe mode. + * + * @throws Exception If failed. + */ + public void testSemaphoreConstantMultipleTopologyChangeFailoverSafe() throws Exception { + final int numPermits = 3; + + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, numPermits, true, true)) { + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < TOP_CHANGE_CNT; i++) { + Collection names = new GridLeanSet<>(3); + + try { + for (int j = 0; j < numPermits; j++) { + String name = UUID.randomUUID().toString(); + + names.add(name); + + Ignite g = startGrid(name); + + final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); + + assertNotNull(sem); + + sem.acquire(); + + if (i == TOP_CHANGE_CNT - 1) { + sem.release(); + } + } + } + finally { + if (i != TOP_CHANGE_CNT - 1) + for (String name : names) { + stopGrid(name); + + awaitPartitionMapExchange(); + } + } + } + } + catch (Exception e) { + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + while (!fut.isDone()) { + s.release(); + + s.acquire(); + } + + fut.get(); + + int val = s.availablePermits(); + + assertEquals(val, numPermits); + + for (Ignite g : G.allGrids()) + assertEquals(val, g.semaphore(STRUCTURE_NAME, 0, true, true).availablePermits()); + } + finally { + grid(0).semaphore(STRUCTURE_NAME, 0, true, true).close(); + } + } + } + + /** + * This method test if exception is thrown when node fails in non FailoverSafe mode. + * + * @throws Exception If failed. + */ + public void testSemaphoreConstantTopologyChangeNotFailoverSafe() throws Exception { + try (IgniteSemaphore s = grid(0).semaphore(STRUCTURE_NAME, 1, false, true)) { + try { + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new CA() { + @Override public void apply() { + try { + for (int i = 0; i < 2; i++) { + String name = UUID.randomUUID().toString(); + + try { + Ignite g = startGrid(name); + + final IgniteSemaphore sem = g.semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true); + + assertNotNull(sem); + + if (i != 1) { + sem.acquire(); + } + + } + finally { + if (i != 1) { + stopGrid(name); + } + } + } + + } + catch (Exception e) { + throw F.wrap(e); + } + } + }, TOP_CHANGE_THREAD_CNT, "topology-change-thread"); + + while (s.availablePermits() != 0) { + // Wait for semaphore to be acquired. + } + + try { + s.acquire(); + fail("In non-FailoverSafe mode IgniteInterruptedCheckedException must be thrown."); + } + catch (Exception e) { + assert (e instanceof IgniteInterruptedException); + } + + assertTrue(s.isBroken()); + + fut.get(); + } + finally { + grid(0).semaphore(STRUCTURE_NAME, TOP_CHANGE_CNT, true, true).close(); + } + } + } + + /** + * @throws Exception If failed. + */ public void testCountDownLatchConstantTopologyChange() throws Exception { try (IgniteCountDownLatch s = grid(0).countDownLatch(STRUCTURE_NAME, Integer.MAX_VALUE, false, true)) { try { @@ -928,4 +1201,4 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig assert grid(0).atomicLong(STRUCTURE_NAME, val, false).get() == val + 1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java index 989f75f..bf6dcda 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -267,6 +268,62 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA /** * @throws Exception If failed. */ + public void testSemaphore() throws Exception { + Ignite clientNode = clientIgnite(); + Ignite srvNode = serverNode(); + + testSemaphore(clientNode, srvNode); + testSemaphore(srvNode, clientNode); + } + + /** + * @param creator Creator node. + * @param other Other node. + * @throws Exception If failed. + */ + private void testSemaphore(Ignite creator, final Ignite other) throws Exception { + assertNull(creator.semaphore("semaphore1", 1, true, false)); + assertNull(other.semaphore("semaphore1", 1, true, false)); + + try (IgniteSemaphore semaphore = creator.semaphore("semaphore1", -1, true, true)) { + assertNotNull(semaphore); + + assertEquals(-1, semaphore.availablePermits()); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + U.sleep(1000); + + IgniteSemaphore semaphore0 = other.semaphore("semaphore1", -1, true, false); + + assertEquals(-1, semaphore0.availablePermits()); + + log.info("Release semaphore."); + + semaphore0.release(2); + + return null; + } + }); + + log.info("Acquire semaphore."); + + assertTrue(semaphore.tryAcquire(1, 5000, TimeUnit.MILLISECONDS)); + + log.info("Finished wait."); + + fut.get(); + + assertEquals(0, semaphore.availablePermits()); + } + + assertNull(creator.semaphore("semaphore1", 1, true, false)); + assertNull(other.semaphore("semaphore1", 1, true, false)); + } + + /** + * @throws Exception If failed. + */ public void testQueue() throws Exception { Ignite clientNode = clientIgnite(); Ignite srvNode = serverNode(); @@ -343,4 +400,4 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA return ignite; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java index f5305a2..4a21765 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteDataStructureUniqueNameTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMemoryMode; @@ -239,7 +240,7 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT private void testUniqueName(final boolean singleGrid) throws Exception { final String name = IgniteUuid.randomUuid().toString(); - final int DS_TYPES = 7; + final int DS_TYPES = 8; final int THREADS = DS_TYPES * 3; @@ -314,6 +315,12 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT break; + case 7: + log.info("Create atomic semaphore, grid: " + ignite.name()); + + res = ignite.semaphore(name, 0, false, true); + + break; default: fail(); @@ -352,7 +359,8 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT res instanceof IgniteAtomicStamped || res instanceof IgniteCountDownLatch || res instanceof IgniteQueue || - res instanceof IgniteSet); + res instanceof IgniteSet || + res instanceof IgniteSemaphore); log.info("Data structure created: " + dataStructure); @@ -371,4 +379,4 @@ public class IgniteDataStructureUniqueNameTest extends IgniteCollectionAbstractT dataStructure.close(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java new file mode 100644 index 0000000..e60aed3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * Cache semaphore self test. + */ +public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstractTest + implements Externalizable { + /** */ + private static final int NODES_CNT = 4; + + /** */ + protected static final int THREADS_CNT = 5; + + /** */ + private static final Random RND = new Random(); + + /** */ + @Rule + public final ExpectedException exception = ExpectedException.none(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODES_CNT; + } + + /** + * @throws Exception If failed. + */ + public void testSemaphore() throws Exception { + checkSemaphore(); + } + + /** + * @throws Exception If failed. + */ + public void testFailover() throws Exception { + if (atomicsCacheMode() == LOCAL) + return; + + checkFailover(true); + checkFailover(false); + } + + /** + * @param failoverSafe Failover safe flag. + * @throws Exception + */ + private void checkFailover(boolean failoverSafe) throws Exception { + IgniteEx g = startGrid(NODES_CNT + 1); + + // For vars locality. + { + // Ensure not exists. + assert g.semaphore("sem", 2, failoverSafe, false) == null; + + IgniteSemaphore sem = g.semaphore( + "sem", + 2, + failoverSafe, + true); + + sem.acquire(2); + + assert !sem.tryAcquire(); + assertEquals( + 0, + sem.availablePermits()); + } + + Ignite g0 = grid(0); + + final IgniteSemaphore sem0 = g0.semaphore( + "sem", + -10, + false, + false); + + assert !sem0.tryAcquire(); + assertEquals(0, sem0.availablePermits()); + + IgniteInternalFuture fut = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + sem0.acquire(); + + info("Acquired in separate thread."); + + return null; + } + }, + 1); + + Thread.sleep(100); + + g.close(); + + try { + fut.get(500); + } + catch (IgniteCheckedException e) { + if (!failoverSafe && e.hasCause(InterruptedException.class)) + info("Ignored expected exception: " + e); + else + throw e; + } + + sem0.close(); + } + + /** + * @throws Exception If failed. + */ + private void checkSemaphore() throws Exception { + // Test API. + checkAcquire(); + + checkRelease(); + + checkFailoverSafe(); + + // Test main functionality. + IgniteSemaphore semaphore1 = grid(0).semaphore("semaphore", -2, true, true); + + assertEquals(-2, semaphore1.availablePermits()); + + IgniteCompute comp = grid(0).compute().withAsync(); + + comp.call(new IgniteCallable() { + @IgniteInstanceResource + private Ignite ignite; + + @LoggerResource + private IgniteLogger log; + + @Nullable @Override public Object call() throws Exception { + // Test semaphore in multiple threads on each node. + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync( + new Callable() { + @Nullable @Override public Object call() throws Exception { + IgniteSemaphore semaphore = ignite.semaphore("semaphore", -2, true, true); + + assert semaphore != null && semaphore.availablePermits() == -2; + + log.info("Thread is going to wait on semaphore: " + Thread.currentThread().getName()); + + assert semaphore.tryAcquire(1, 1, MINUTES); + + log.info("Thread is again runnable: " + Thread.currentThread().getName()); + + semaphore.release(); + + return null; + } + }, + 5, + "test-thread" + ); + + fut.get(); + + return null; + } + }); + + IgniteFuture fut = comp.future(); + + Thread.sleep(3000); + + semaphore1.release(2); + + assert semaphore1.availablePermits() == 0; + + semaphore1.release(1); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed semaphore. + semaphore1.close(); + + checkRemovedSemaphore(semaphore1); + } + + /** + * @param semaphore Semaphore. + * @throws Exception If failed. + */ + protected void checkRemovedSemaphore(final IgniteSemaphore semaphore) throws Exception { + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return semaphore.removed(); + } + }, 5000); + + assert semaphore.removed(); + } + + /** + * This method only checks if parameter of new semaphore is initialized properly. + * For tests considering failure recovery see @GridCachePartitionedNodeFailureSelfTest. + * + * @throws Exception Exception. + */ + private void checkFailoverSafe() throws Exception { + // Checks only if semaphore is initialized properly + IgniteSemaphore semaphore = createSemaphore("rmv", 5, true); + + assert semaphore.isFailoverSafe(); + + removeSemaphore("rmv"); + + IgniteSemaphore semaphore1 = createSemaphore("rmv1", 5, false); + + assert !semaphore1.isFailoverSafe(); + + removeSemaphore("rmv1"); + } + + /** + * @throws Exception Exception. + */ + private void checkAcquire() throws Exception { + // Check only 'false' cases here. Successful await is tested over the grid. + IgniteSemaphore semaphore = createSemaphore("acquire", 5, false); + + assert !semaphore.tryAcquire(10); + assert !semaphore.tryAcquire(10, 10, MICROSECONDS); + + removeSemaphore("acquire"); + } + + /** + * @throws Exception Exception. + */ + private void checkRelease() throws Exception { + IgniteSemaphore semaphore = createSemaphore("release", 5, false); + + semaphore.release(); + assert semaphore.availablePermits() == 6; + + semaphore.release(2); + assert semaphore.availablePermits() == 8; + + assert semaphore.drainPermits() == 8; + assert semaphore.availablePermits() == 0; + + removeSemaphore("release"); + + checkRemovedSemaphore(semaphore); + + IgniteSemaphore semaphore2 = createSemaphore("release2", -5, false); + + semaphore2.release(); + + assert semaphore2.availablePermits() == -4; + + semaphore2.release(2); + + assert semaphore2.availablePermits() == -2; + + assert semaphore2.drainPermits() == -2; + + assert semaphore2.availablePermits() == 0; + + removeSemaphore("release2"); + + checkRemovedSemaphore(semaphore2); + } + + /** + * @param semaphoreName Semaphore name. + * @param numPermissions Initial number of permissions. + * @param failoverSafe Fairness flag. + * @return New semaphore. + * @throws Exception If failed. + */ + private IgniteSemaphore createSemaphore(String semaphoreName, int numPermissions, boolean failoverSafe) + throws Exception { + IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, numPermissions, failoverSafe, true); + + // Test initialization. + assert semaphoreName.equals(semaphore.name()); + assert semaphore.availablePermits() == numPermissions; + assert semaphore.getQueueLength() == 0; + assert semaphore.isFailoverSafe() == failoverSafe; + + return semaphore; + } + + /** + * @param semaphoreName Semaphore name. + * @throws Exception If failed. + */ + private void removeSemaphore(String semaphoreName) + throws Exception { + IgniteSemaphore semaphore = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 10, false, true); + + assert semaphore != null; + + if (semaphore.availablePermits() < 0) + semaphore.release(-semaphore.availablePermits()); + + // Remove semaphore on random node. + IgniteSemaphore semaphore0 = grid(RND.nextInt(NODES_CNT)).semaphore(semaphoreName, 0, false, true); + + assertNotNull(semaphore0); + + semaphore0.close(); + + // Ensure semaphore is removed on all nodes. + for (Ignite g : G.allGrids()) + assertNull(((IgniteKernal)g).context().dataStructures().semaphore(semaphoreName, 10, true, false)); + + checkRemovedSemaphore(semaphore); + } + + /** + * @throws Exception If failed. + */ + public void testSemaphoreMultinode1() throws Exception { + if (gridCount() == 1) + return; + + IgniteSemaphore semaphore = grid(0).semaphore("s1", 0, true, true); + + List> futs = new ArrayList<>(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + IgniteSemaphore semaphore = ignite.semaphore("s1", 0, true, false); + + assertNotNull(semaphore); + + boolean wait = semaphore.tryAcquire(30_000, MILLISECONDS); + + assertTrue(wait); + + return null; + } + })); + } + + for (int i = 0; i < 10; i++) + semaphore.release(); + + for (IgniteInternalFuture fut : futs) + fut.get(30_000); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java new file mode 100644 index 0000000..a516fc1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/local/IgniteLocalSemaphoreSelfTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.local; + +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteSemaphore; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.cache.CacheMode.LOCAL; + +/** + * + */ +public class IgniteLocalSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return LOCAL; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void testSemaphore() throws Exception { + // Test main functionality. + IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true); + + assertNotNull(semaphore); + + assertEquals(-2, semaphore.availablePermits()); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync( + new Callable() { + @Nullable @Override public Object call() throws Exception { + IgniteSemaphore semaphore = grid(0).semaphore("semaphore", -2, false, true); + + assert semaphore != null && semaphore.availablePermits() == -2; + + info("Thread is going to wait on semaphore: " + Thread.currentThread().getName()); + + assert semaphore.tryAcquire(1, 1, MINUTES); + + info("Thread is again runnable: " + Thread.currentThread().getName()); + + semaphore.release(); + + return null; + } + }, + THREADS_CNT, + "test-thread" + ); + + Thread.sleep(3000); + + assert semaphore.availablePermits() == -2; + + semaphore.release(2); + + assert semaphore.availablePermits() == 0; + + semaphore.release(); + + // Ensure there are no hangs. + fut.get(); + + // Test operations on removed latch. + IgniteSemaphore semaphore0 = grid(0).semaphore("semaphore", 0, false, false); + + assertNotNull(semaphore0); + + semaphore0.close(); + + checkRemovedSemaphore(semaphore0); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java new file mode 100644 index 0000000..c302cad --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedSemaphoreSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgnitePartitionedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java index 69de7cd..d0131d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedDataStructuresFailoverSelfTest.java @@ -50,4 +50,4 @@ public class GridCacheReplicatedDataStructuresFailoverSelfTest @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { return TRANSACTIONAL; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java new file mode 100644 index 0000000..f58754f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/IgniteReplicatedSemaphoreSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.replicated; + +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteSemaphoreAbstractSelfTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class IgniteReplicatedSemaphoreSelfTest extends IgniteSemaphoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode atomicsCacheMode() { + return REPLICATED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java index 2fa8940..d4ca9a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/cache/GridCacheDataStructuresLoadTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCountDownLatch; import org.apache.ignite.IgniteQueue; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.CollectionConfiguration; @@ -60,6 +61,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad /** Count down latch name. */ private static final String TEST_LATCH_NAME = "test-latch"; + /** Semaphore name. */ + private static final String TEST_SEMAPHORE_NAME = "test-semaphore"; + /** */ private static final CollectionConfiguration colCfg = new CollectionConfiguration(); @@ -69,6 +73,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad /** Count down latch initial count. */ private static final int LATCH_INIT_CNT = 1000; + /** Semaphore initial count. */ + private static final int SEMAPHORE_INIT_CNT = 1000; + /** */ private static final boolean LONG = false; @@ -88,6 +95,9 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad private static final boolean LATCH = true; /** */ + private static final boolean SEMAPHORE = true; + + /** */ private GridCacheDataStructuresLoadTest() { // No-op } @@ -95,210 +105,247 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad /** Atomic long write closure. */ private final CIX1 longWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - al.addAndGet(RAND.nextInt(MAX_INT)); + for (int i = 0; i < operationsPerTx; i++) { + al.addAndGet(RAND.nextInt(MAX_INT)); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Atomic long read closure. */ private final CIX1 longReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - al.get(); + for (int i = 0; i < operationsPerTx; i++) { + al.get(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; /** Atomic reference write closure. */ private final CIX1 refWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicReference ar = ignite.atomicReference(TEST_REF_NAME, - null, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicReference ar = ignite.atomicReference(TEST_REF_NAME, + null, true); - for (int i = 0; i < operationsPerTx; i++) { - ar.set(RAND.nextInt(MAX_INT)); + for (int i = 0; i < operationsPerTx; i++) { + ar.set(RAND.nextInt(MAX_INT)); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Atomic reference read closure. */ private final CIX1 refReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicReference ar = ignite.atomicReference(TEST_REF_NAME, null, - true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicReference ar = ignite.atomicReference(TEST_REF_NAME, null, + true); - for (int i = 0; i < operationsPerTx; i++) { - ar.get(); + for (int i = 0; i < operationsPerTx; i++) { + ar.get(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; /** Atomic sequence write closure. */ private final CIX1 seqWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - as.addAndGet(RAND.nextInt(MAX_INT) + 1); + for (int i = 0; i < operationsPerTx; i++) { + as.addAndGet(RAND.nextInt(MAX_INT) + 1); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Atomic sequence read closure. */ private final CIX1 seqReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - as.get(); + for (int i = 0; i < operationsPerTx; i++) { + as.get(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; /** Atomic stamped write closure. */ private final CIX1 stampWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicStamped as = ignite.atomicStamped(TEST_STAMP_NAME, - 0, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicStamped as = ignite.atomicStamped(TEST_STAMP_NAME, + 0, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT)); + for (int i = 0; i < operationsPerTx; i++) { + as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT)); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Atomic stamped read closure. */ private final CIX1 stampReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteAtomicStamped as = ignite.atomicStamped(TEST_STAMP_NAME, - 0, 0, true); + @Override public void applyx(Ignite ignite) { + IgniteAtomicStamped as = ignite.atomicStamped(TEST_STAMP_NAME, + 0, 0, true); - for (int i = 0; i < operationsPerTx; i++) { - as.get(); + for (int i = 0; i < operationsPerTx; i++) { + as.get(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; /** Queue write closure. */ private final CIX1 queueWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteQueue q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg); + @Override public void applyx(Ignite ignite) { + IgniteQueue q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg); - for (int i = 0; i < operationsPerTx; i++) { - q.put(RAND.nextInt(MAX_INT)); + for (int i = 0; i < operationsPerTx; i++) { + q.put(RAND.nextInt(MAX_INT)); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Queue read closure. */ private final CIX1 queueReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteQueue q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg); + @Override public void applyx(Ignite ignite) { + IgniteQueue q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg); - for (int i = 0; i < operationsPerTx; i++) { - q.peek(); + for (int i = 0; i < operationsPerTx; i++) { + q.peek(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; /** Count down latch write closure. */ private final CIX1 latchWriteClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true); + @Override public void applyx(Ignite ignite) { + IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true); - for (int i = 0; i < operationsPerTx; i++) { - l.countDown(); + for (int i = 0; i < operationsPerTx; i++) { + l.countDown(); - long cnt = writes.incrementAndGet(); + long cnt = writes.incrementAndGet(); - if (cnt % WRITE_LOG_MOD == 0) - info("Performed " + cnt + " writes."); + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } } - } - }; + }; /** Count down latch read closure. */ private final CIX1 latchReadClos = new CIX1() { - @Override public void applyx(Ignite ignite) { - IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true); + @Override public void applyx(Ignite ignite) { + IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true); - for (int i = 0; i < operationsPerTx; i++) { - l.count(); + for (int i = 0; i < operationsPerTx; i++) { + l.count(); - long cnt = reads.incrementAndGet(); + long cnt = reads.incrementAndGet(); - if (cnt % READ_LOG_MOD == 0) - info("Performed " + cnt + " reads."); + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } } - } - }; + }; + + /** Semaphore write closure. */ + private final CIX1 semaphoreWriteClos = + new CIX1() { + @Override public void applyx(Ignite ignite) { + IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true); + + for (int i = 0; i < operationsPerTx; i++) { + if ((i % 2) == 0) + s.release(); + else + s.acquire(); + + long cnt = writes.incrementAndGet(); + + if (cnt % WRITE_LOG_MOD == 0) + info("Performed " + cnt + " writes."); + } + } + }; + + /** Semaphore read closure. */ + private final CIX1 semaphoreReadClos = + new CIX1() { + @Override public void applyx(Ignite ignite) { + IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true); + + for (int i = 0; i < operationsPerTx; i++) { + s.availablePermits(); + + long cnt = reads.incrementAndGet(); + + if (cnt % READ_LOG_MOD == 0) + info("Performed " + cnt + " reads."); + } + } + }; /** * @param args Arguments. @@ -362,6 +409,14 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad test.loadTestIgnite(test.latchWriteClos, test.latchReadClos); } + + System.gc(); + + if (SEMAPHORE) { + info("Testing semaphore..."); + + test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos); + } } } @@ -407,7 +462,7 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad @Nullable @Override public Object call() throws Exception { long start = System.currentTimeMillis(); - while(!done.get()) { + while (!done.get()) { if (tx) { try (Transaction tx = ignite.transactions().txStart()) { readClos.apply(ignite); @@ -447,4 +502,4 @@ public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoad throw new RuntimeException(e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java index bfeafdf..1940077 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteFileSystem; @@ -313,6 +314,15 @@ public class IgniteMock implements Ignite { } /** {@inheritDoc} */ + @Nullable @Override public IgniteSemaphore semaphore(String name, + int cnt, + boolean failoverSafe, + boolean create) + { + return null; + } + + /** {@inheritDoc} */ @Nullable @Override public IgniteQueue queue(String name, int cap, CollectionConfiguration cfg) http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index 3eb9d98..45b82ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteAtomicStamped; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteCountDownLatch; +import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; @@ -535,6 +536,12 @@ public class IgniteProcessProxy implements IgniteEx { } /** {@inheritDoc} */ + @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, + boolean create) throws IgniteException { + throw new UnsupportedOperationException("Operation isn't supported yet."); + } + + /** {@inheritDoc} */ @Override public IgniteQueue queue(String name, int cap, @Nullable CollectionConfiguration cfg) throws IgniteException { throw new UnsupportedOperationException("Operation isn't supported yet."); http://git-wip-us.apache.org/repos/asf/ignite/blob/8e7e3309/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 7740907..c5cde89 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -406,6 +406,18 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ + @Nullable @Override public IgniteSemaphore semaphore(String name, + int cnt, + boolean failoverSafe, + boolean create) + { + assert g != null; + + return g.semaphore(name, cnt, + failoverSafe, create); + } + + /** {@inheritDoc} */ @Nullable @Override public IgniteQueue queue(String name, int cap, CollectionConfiguration cfg)