Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4F3CC1833A for ; Wed, 24 Feb 2016 05:59:30 +0000 (UTC) Received: (qmail 88206 invoked by uid 500); 24 Feb 2016 05:59:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 88061 invoked by uid 500); 24 Feb 2016 05:59:30 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 87913 invoked by uid 99); 24 Feb 2016 05:59:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 05:59:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4312E8EA3; Wed, 24 Feb 2016 05:59:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 24 Feb 2016 05:59:36 -0000 Message-Id: In-Reply-To: <00fbff10a4d3446dbada907e7f7ef581@git.apache.org> References: <00fbff10a4d3446dbada907e7f7ef581@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/10] ignite git commit: IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459. IGNITE-1144 Add affinity functions to collocated Ignite Queue and Set. Fixes #459. Signed-off-by: Valentin Kulichenko Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb5bce29 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb5bce29 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb5bce29 Branch: refs/heads/ignite-1232 Commit: eb5bce2952ec05d4bd49a99fa56d1e979d4436ce Parents: bc8d73e Author: Oddo Da Authored: Tue Feb 23 17:03:16 2016 -0800 Committer: Valentin Kulichenko Committed: Tue Feb 23 17:03:16 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteQueue.java | 13 ++- .../main/java/org/apache/ignite/IgniteSet.java | 12 +- .../datastructures/GridCacheQueueAdapter.java | 45 ++++---- .../datastructures/GridCacheQueueProxy.java | 66 ++--------- .../datastructures/GridCacheSetImpl.java | 44 ++++---- .../datastructures/GridCacheSetProxy.java | 58 ++-------- .../GridCacheQueueApiSelfAbstractTest.java | 111 ++++++++++++++----- .../GridCacheSetAbstractSelfTest.java | 103 ++++++++++++++++- 8 files changed, 269 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java index 3b892c2..2dc38e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java @@ -22,9 +22,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.ignite.lang.IgniteAsyncSupported; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteRunnable; /** * This interface provides a rich API for working with distributed queues based on In-Memory Data Grid. @@ -187,8 +186,10 @@ public interface IgniteQueue extends BlockingQueue, Closeable { /** * Executes given job on collocated queue on the node where the queue is located * (a.k.a. affinity co-location). + *

+ * This is not supported for non-collocated queues. * - * @param job Job which will be co-located on the node with given affinity key. + * @param job Job which will be co-located with the queue. * @throws IgniteException If job failed. */ public void affinityRun(IgniteRunnable job) throws IgniteException; @@ -196,9 +197,11 @@ public interface IgniteQueue extends BlockingQueue, Closeable { /** * Executes given job on collocated queue on the node where the queue is located * (a.k.a. affinity co-location). + *

+ * This is not supported for non-collocated queues. * - * @param job Job which will be co-located on the node with given affinity key. + * @param job Job which will be co-located with the queue. * @throws IgniteException If job failed. */ public R affinityCall(IgniteCallable job) throws IgniteException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/IgniteSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java index ea32186..d72d9fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java @@ -21,8 +21,6 @@ import java.io.Closeable; import java.util.Collection; import java.util.Iterator; import java.util.Set; - -import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteRunnable; @@ -111,8 +109,10 @@ public interface IgniteSet extends Set, Closeable { /** * Executes given job on collocated set on the node where the set is located * (a.k.a. affinity co-location). + *

+ * This is not supported for non-collocated sets. * - * @param job Job which will be co-located on the node with given affinity key. + * @param job Job which will be co-located with the set. * @throws IgniteException If job failed. */ public void affinityRun(IgniteRunnable job) throws IgniteException; @@ -120,9 +120,11 @@ public interface IgniteSet extends Set, Closeable { /** * Executes given job on collocated set on the node where the set is located * (a.k.a. affinity co-location). + *

+ * This is not supported for non-collocated sets. * - * @param job Job which will be co-located on the node with given affinity key. + * @param job Job which will be co-located with the set. * @throws IgniteException If job failed. */ public R affinityCall(IgniteCallable job) throws IgniteException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 4be7413..caf3ba3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -44,13 +45,11 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.lang.IgniteCallable; -import org.apache.ignite.IgniteCompute; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -99,7 +98,7 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @GridToStringExclude private final Semaphore writeSem; - /** access to affinityRun() and affinityCall() functions */ + /** Access to affinityRun() and affinityCall() functions. */ private final IgniteCompute compute; /** */ @@ -411,6 +410,24 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp return rmvd; } + /** {@inheritDoc} */ + public void affinityRun(IgniteRunnable job) { + if (!collocated) + throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() + + ". This operation is supported only for collocated queues."); + + compute.affinityRun(cache.name(), queueKey, job); + } + + /** {@inheritDoc} */ + public R affinityCall(IgniteCallable job) { + if (!collocated) + throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() + + ". This operation is supported only for collocated queues."); + + return compute.affinityCall(cache.name(), queueKey, job); + } + /** * @param cache Queue cache. * @param id Queue unique ID. @@ -1071,22 +1088,4 @@ public abstract class GridCacheQueueAdapter extends AbstractCollection imp @Override public String toString() { return S.toString(GridCacheQueueAdapter.class, this); } - - /** {@inheritDoc} */ - public void affinityRun(IgniteRunnable job) { - if (!collocated) - throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() + - ". This operation is supported only for collocated queues."); - - compute.affinityRun(cache.name(),queueKey,job); - } - - /** {@inheritDoc} */ - public R affinityCall(IgniteCallable job) { - if (!collocated) - throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() + - ". This operation is supported only for collocated queues."); - - return compute.affinityCall(cache.name(),queueKey,job); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java index 93abbe6..c869743 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java @@ -28,8 +28,6 @@ import java.util.Iterator; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; -import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -726,6 +724,16 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { } /** {@inheritDoc} */ + @Override public void affinityRun(final IgniteRunnable job) { + delegate.affinityRun(job); + } + + /** {@inheritDoc} */ + @Override public R affinityCall(final IgniteCallable job) { + return delegate.affinityCall(job); + } + + /** {@inheritDoc} */ @Override public int hashCode() { return delegate.hashCode(); } @@ -781,56 +789,4 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { @Override public String toString() { return delegate.toString(); } - - /** {@inheritDoc} */ - @Override public void affinityRun(final IgniteRunnable job) { - gate.enter(); - - try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override - public Void call() throws Exception { - if (!delegate.collocated()) - throw new IgniteException("Failed to execute affinityRun() for non-collocated queue: " + name() + - ". This operation is supported only for collocated queues."); - delegate.affinityRun(job); - return null; - } - }, cctx); - } else - delegate.affinityRun(job); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - gate.leave(); - } - } - - /** {@inheritDoc} */ - @Override public R affinityCall(final IgniteCallable job) { - gate.enter(); - - try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public R call() throws Exception { - if (!delegate.collocated()) - throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() + - ". This operation is supported only for collocated queues."); - return delegate.affinityCall(job); - } - }, cctx); - - return delegate.affinityCall(job); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - gate.leave(); - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 3f03380..5b74992 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -81,10 +81,10 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite /** Collocation flag. */ private final boolean collocated; - /** Queue header partition. */ + /** Set header partition. */ private final int hdrPart; - /** Queue header key. */ + /** Set header key. */ protected final GridCacheSetHeaderKey setKey; /** Removed flag. */ @@ -93,7 +93,7 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite /** */ private final boolean binaryMarsh; - /** access to affinityRun() and affinityCall() functions */ + /** Access to affinityRun() and affinityCall() functions. */ private final IgniteCompute compute; /** @@ -374,6 +374,24 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite } /** {@inheritDoc} */ + public void affinityRun(IgniteRunnable job) { + if (!collocated) + throw new IgniteException("Failed to execute affinityRun() for non-collocated set: " + name() + + ". This operation is supported only for collocated sets."); + + compute.affinityRun(cache.name(), setKey, job); + } + + /** {@inheritDoc} */ + public R affinityCall(IgniteCallable job) { + if (!collocated) + throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() + + ". This operation is supported only for collocated sets."); + + return compute.affinityCall(cache.name(), setKey, job); + } + + /** {@inheritDoc} */ @Override public void close() { try { if (rmvd) @@ -547,24 +565,6 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite return S.toString(GridCacheSetImpl.class, this); } - /** {@inheritDoc} */ - public void affinityRun(IgniteRunnable job) { - if (!collocated) - throw new IgniteException("Failed to execute affinityCall() for non-collocated set: " + name() + - ". This operation is supported only for collocated sets."); - - compute.affinityRun(cache.name(),setKey,job); - } - - /** {@inheritDoc} */ - public R affinityCall(IgniteCallable job) { - if (!collocated) - throw new IgniteException("Failed to execute affinityCall() for non-collocated queue: " + name() + - ". This operation is supported only for collocated queues."); - - return compute.affinityCall(cache.name(),setKey,job); - } - /** * */ @@ -655,4 +655,4 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite setName = U.readString(in); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index d40c286..219bb4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -526,6 +526,16 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { return delegate.removed(); } + /** {@inheritDoc} */ + @Override public void affinityRun(final IgniteRunnable job) { + delegate.affinityRun(job); + } + + /** {@inheritDoc} */ + @Override public R affinityCall(final IgniteCallable job) { + return delegate.affinityCall(job); + } + /** * Enters busy state. */ @@ -590,52 +600,6 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { t.set2(U.readString(in)); } - /** {@inheritDoc} */ - @Override public void affinityRun(final IgniteRunnable job) { - gate.enter(); - - try { - if (cctx.transactional()) - CU.outTx(new Callable() { - @Override - public Void call() throws Exception { - delegate.affinityRun(job); - return null; - } - }, cctx); - else - delegate.affinityRun(job); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - gate.leave(); - } - } - - /** {@inheritDoc} */ - @Override public R affinityCall(final IgniteCallable job) { - gate.enter(); - - try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public R call() throws Exception { - return delegate.affinityCall(job); - } - }, cctx); - - return delegate.affinityCall(job); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - finally { - gate.leave(); - } - } - /** * Reconstructs object on unmarshalling. * @@ -660,4 +624,4 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { @Override public String toString() { return delegate.toString(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java index 0c76595..f9499a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java @@ -28,14 +28,15 @@ import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; @@ -608,40 +609,100 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection assertNotNull(((IgniteKernal)grid(0)).internalCache(ccfg.getName())); } + /** + * @throws Exception If failed. + */ public void testAffinityRun() throws Exception { - /** Test exception on non-collocated queue */ final CollectionConfiguration colCfg = collectionConfiguration(); + colCfg.setCollocated(false); colCfg.setCacheMode(CacheMode.PARTITIONED); - final IgniteQueue queue = grid(0).queue("Queue1", 0, colCfg); + try (final IgniteQueue queue1 = grid(0).queue("Queue1", 0, colCfg)) { + GridTestUtils.assertThrows( + log, + new Callable() { + @Override public Void call() throws Exception { + queue1.affinityRun(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }, + IgniteException.class, + "Failed to execute affinityRun() for non-collocated queue: " + queue1.name() + + ". This operation is supported only for collocated queues."); + } - GridTestUtils.assertThrows(log, new Callable() { - @Override public Void call() throws Exception { - queue.affinityRun(new IgniteRunnable() { - @Override public void run() { ; }}); - return null; - } - }, IgniteException.class, - "Failed to execute affinityRun() for non-collocated queue: " + queue.name() + - ". This operation is supported only for collocated queues."); + colCfg.setCollocated(true); + + try (final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg)) { + queue2.add(100); + + queue2.affinityRun(new IgniteRunnable() { + @IgniteInstanceResource + private IgniteEx ignite; + + @Override public void run() { + assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup( + ignite.cluster().localNode(), "Queue2")); + + assertEquals(100, queue2.take().intValue()); + } + }); + } + } - queue.close(); + /** + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + final CollectionConfiguration colCfg = collectionConfiguration(); + + colCfg.setCollocated(false); + colCfg.setCacheMode(CacheMode.PARTITIONED); + + try (final IgniteQueue queue1 = grid(0).queue("Queue1", 0, colCfg)) { + GridTestUtils.assertThrows( + log, + new Callable() { + @Override public Void call() throws Exception { + queue1.affinityCall(new IgniteCallable() { + @Override public Object call() { + return null; + } + }); + + return null; + } + }, + IgniteException.class, + "Failed to execute affinityCall() for non-collocated queue: " + queue1.name() + + ". This operation is supported only for collocated queues."); + } - /** Test running a job on a collocated queue */ colCfg.setCollocated(true); - final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg); - /** add a number to the queue */ - queue2.add(100); + try (final IgniteQueue queue2 = grid(0).queue("Queue2", 0, colCfg)) { + queue2.add(100); - /** read the number back using affinityRun() */ - queue2.affinityRun(new IgniteRunnable() { - @Override public void run() { - assert((int)queue2.take() == 100); - } - }); - queue2.close(); + Integer res = queue2.affinityCall(new IgniteCallable() { + @IgniteInstanceResource + private IgniteEx ignite; + + @Override public Integer call() { + assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup( + ignite.cluster().localNode(), "Queue2")); + + return queue2.take(); + } + }); + + assertEquals(100, res.intValue()); + } } /** @@ -683,4 +744,4 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eb5bce29/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java index 7c0cc30..c63df40 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java @@ -30,8 +30,11 @@ import java.util.concurrent.atomic.AtomicInteger; import junit.framework.AssertionFailedError; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSet; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -40,6 +43,8 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -846,6 +851,102 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr } /** + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { + final CollectionConfiguration colCfg = collectionConfiguration(); + + colCfg.setCollocated(false); + colCfg.setCacheMode(CacheMode.PARTITIONED); + + try (final IgniteSet set1 = grid(0).set("Set1", colCfg)) { + GridTestUtils.assertThrows( + log, + new Callable() { + @Override public Void call() throws Exception { + set1.affinityRun(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }, + IgniteException.class, + "Failed to execute affinityRun() for non-collocated set: " + set1.name() + + ". This operation is supported only for collocated sets."); + } + + colCfg.setCollocated(true); + + try (final IgniteSet set2 = grid(0).set("Set2", colCfg)) { + set2.add(100); + + set2.affinityRun(new IgniteRunnable() { + @IgniteInstanceResource + private IgniteEx ignite; + + @Override public void run() { + assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup( + ignite.cluster().localNode(), "Set2")); + + assertEquals(100, set2.iterator().next().intValue()); + } + }); + } + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + final CollectionConfiguration colCfg = collectionConfiguration(); + + colCfg.setCollocated(false); + colCfg.setCacheMode(CacheMode.PARTITIONED); + + try (final IgniteSet set1 = grid(0).set("Set1", colCfg)) { + GridTestUtils.assertThrows( + log, + new Callable() { + @Override public Void call() throws Exception { + set1.affinityCall(new IgniteCallable() { + @Override public Object call() { + return null; + } + }); + + return null; + } + }, + IgniteException.class, + "Failed to execute affinityCall() for non-collocated set: " + set1.name() + + ". This operation is supported only for collocated sets."); + } + + colCfg.setCollocated(true); + + try (final IgniteSet set2 = grid(0).set("Set2", colCfg)) { + set2.add(100); + + Integer res = set2.affinityCall(new IgniteCallable() { + @IgniteInstanceResource + private IgniteEx ignite; + + @Override public Integer call() { + assertTrue(ignite.cachex("datastructures_0").affinity().isPrimaryOrBackup( + ignite.cluster().localNode(), "Set2")); + + return set2.iterator().next(); + } + }); + + assertEquals(100, res.intValue()); + } + } + + /** * @param set Set. * @param size Expected size. */ @@ -882,4 +983,4 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr assertTrue(found); } } -} \ No newline at end of file +}