ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [16/53] [abbrv] ignite git commit: IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution
Date Fri, 19 Aug 2016 11:48:33 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
new file mode 100644
index 0000000..168b25c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTest.java
@@ -0,0 +1,852 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
+import org.apache.ignite.compute.ComputeTaskSession;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest
{
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     * @throws Exception If failed.
+     */
+    private static int getPersonsCountFromPartitionMapCheckBothCaches(final IgniteEx ignite,
IgniteLogger log,
+        int orgId) throws Exception {
+
+        assertEquals(1, getOrganizationCountFromPartitionMap(ignite, log, orgId));
+
+        return getPersonsCountFromPartitionMap(ignite, log, orgId);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     * @throws Exception If failed.
+     */
+    private static int getOrganizationCountFromPartitionMap(final IgniteEx ignite, IgniteLogger
log,
+        int orgId) throws Exception {
+        int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+        GridCacheAdapter<?, ?> cacheAdapterOrg = ignite.context().cache()
+            .internalCache(Organization.class.getSimpleName());
+
+        GridDhtLocalPartition pOrgs = cacheAdapterOrg.context().topology()
+            .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+        int cnt = 0;
+        for (GridCacheMapEntry e : pOrgs.entries()) {
+            Integer k = (Integer)e.keyValue(false);
+            Organization org = e.val.value(ignite.context().cacheObjects().contextForCache(
+                cacheAdapterOrg.cacheCfg), false);
+
+            if (org != null && org.getId() == orgId)
+                ++cnt;
+        }
+
+        return cnt;
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     * @throws Exception If failed.
+     */
+    private static int getPersonsCountFromPartitionMap(final IgniteEx ignite, IgniteLogger
log, int orgId)
+        throws Exception {
+        int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+        GridCacheAdapter<?, ?> cacheAdapterPers = ignite.context().cache()
+            .internalCache(Person.class.getSimpleName());
+
+        GridDhtLocalPartition pPers = cacheAdapterPers.context().topology()
+            .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+        int cnt = 0;
+        for (GridCacheMapEntry e : pPers.entries()) {
+            Person.Key k = (Person.Key)e.keyValue(false);
+            Person p = e.val.value(ignite.context().cacheObjects().contextForCache(
+                cacheAdapterPers.cacheCfg), false);
+
+            if (p != null && p.getOrgId() == orgId && k.orgId == orgId)
+                ++cnt;
+        }
+
+        return cnt;
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     */
+    private static int getPersonsCountBySqlFieldLocalQuery(final IgniteEx ignite, IgniteLogger
log, int orgId) {
+        List res = ignite.cache(Person.class.getSimpleName())
+            .query(new SqlFieldsQuery(
+                String.format("SELECT p.id FROM \"%s\".Person as p " +
+                        "WHERE p.orgId = " + orgId,
+                    Person.class.getSimpleName())).setLocal(true))
+            .getAll();
+
+        return res.size();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     */
+    private static int getPersonsCountBySqlFieledLocalQueryJoinOrgs(final IgniteEx ignite,
IgniteLogger log,
+        int orgId) {
+        List res = ignite.cache(Person.class.getSimpleName())
+            .query(new SqlFieldsQuery(
+                String.format("SELECT p.id FROM \"%s\".Person as p, \"%s\".Organization as
o " +
+                        "WHERE p.orgId = o.id " +
+                        "AND p.orgId = " + orgId,
+                    Person.class.getSimpleName(),
+                    Organization.class.getSimpleName())).setLocal(true))
+            .getAll();
+
+        return res.size();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     */
+    private static int getPersonsCountBySqlLocalQuery(final IgniteEx ignite, IgniteLogger
log, int orgId) {
+        List res = ignite.cache(Person.class.getSimpleName())
+            .query(new SqlQuery<Person.Key, Person>(Person.class, "orgId = ?").setArgs(orgId).setLocal(true))
+            .getAll();
+
+        return res.size();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     */
+    private static int getPersonsCountByScanLocalQuery(final IgniteEx ignite, IgniteLogger
log, final int orgId) {
+        List res = ignite.cache(Person.class.getSimpleName())
+            .query(new ScanQuery<>(new IgniteBiPredicate<Person.Key, Person>()
{
+                @Override public boolean apply(Person.Key key, Person person) {
+                    return person.getOrgId() == orgId;
+                }
+            }).setLocal(true)).getAll();
+
+        return res.size();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     * @throws Exception If failed.
+     */
+    private static int getPersonsCountSingleCache(final IgniteEx ignite, IgniteLogger log,
final int orgId)
+        throws Exception {
+        int sqlCnt = getPersonsCountBySqlLocalQuery(ignite, log, orgId);
+        int sqlFieldCnt = getPersonsCountBySqlFieldLocalQuery(ignite, log, orgId);
+        int scanCnt = getPersonsCountByScanLocalQuery(ignite, log, orgId);
+        int partCnt = getPersonsCountFromPartitionMap(ignite, log, orgId);
+
+        assertEquals(PERS_AT_ORG_CNT, partCnt);
+        assertEquals(partCnt, sqlCnt);
+        assertEquals(partCnt, sqlFieldCnt);
+        assertEquals(partCnt, scanCnt);
+
+        return partCnt;
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param log Logger.
+     * @param orgId Organization id.
+     * @return Count of found Person object with specified orgId
+     * @throws Exception If failed.
+     */
+    private static int getPersonsCountMultipleCache(final IgniteEx ignite, IgniteLogger log,
final int orgId)
+        throws Exception {
+        int sqlFieldCnt = getPersonsCountBySqlFieledLocalQueryJoinOrgs(ignite, log, orgId);
+        int partCnt = getPersonsCountFromPartitionMapCheckBothCaches(ignite, log, orgId);
+
+        assertEquals(partCnt, sqlFieldCnt);
+
+        return partCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        // Workaround for initial update job metadata.
+        grid(0).compute().affinityCall(
+            Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+            0,
+            new TestAffinityCall(new PersonsCountGetter() {
+                @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int
orgId) throws Exception {
+                    return PERS_AT_ORG_CNT;
+                }
+            }, 0));
+
+        grid(0).compute().affinityRun(
+            Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+            0,
+            new TestAffinityRun(new PersonsCountGetter() {
+                @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int
orgId) throws Exception {
+                    return PERS_AT_ORG_CNT;
+                }
+            }, 0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleCache() throws Exception {
+        final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
+            @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId)
throws Exception {
+                return getPersonsCountSingleCache(ignite, log, orgId);
+            }
+        };
+
+        // Run restart threads: start re-balancing.
+        beginNodesRestart();
+
+        IgniteInternalFuture<Long> affFut = null;
+
+        try {
+            final AtomicInteger threadNum = new AtomicInteger(0);
+            affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    if (threadNum.getAndIncrement() % 2 == 0) {
+                        while (System.currentTimeMillis() < endTime) {
+                            for (final int orgId : orgIds) {
+                                if (System.currentTimeMillis() >= endTime)
+                                    break;
+
+                                grid(0).compute().affinityRun(Person.class.getSimpleName(),
+                                    new Person(0, orgId).createKey(),
+                                    new TestAffinityRun(personsCntGetter, orgId));
+                            }
+                        }
+                    }
+                    else {
+                        while (System.currentTimeMillis() < endTime) {
+                            for (final int orgId : orgIds) {
+                                if (System.currentTimeMillis() >= endTime)
+                                    break;
+
+                                int personsCnt = grid(0).compute().affinityCall(Person.class.getSimpleName(),
+                                    new Person(0, orgId).createKey(),
+                                    new TestAffinityCall(personsCntGetter, orgId));
+
+                                assertEquals(PERS_AT_ORG_CNT, personsCnt);
+                            }
+                        }
+                    }
+                }
+            }, AFFINITY_THREADS_CNT, "affinity-run");
+        }
+        finally {
+            if (affFut != null)
+                affFut.get();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleCaches() throws Exception {
+        final PersonsCountGetter personsCntGetter = new PersonsCountGetter() {
+            @Override public int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId)
throws Exception {
+                return getPersonsCountMultipleCache(ignite, log, orgId);
+            }
+        };
+        // Run restart threads: start re-balancing
+        beginNodesRestart();
+
+        IgniteInternalFuture<Long> affFut = null;
+        try {
+            final AtomicInteger threadNum = new AtomicInteger(0);
+            affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    if (threadNum.getAndIncrement() % 2 == 0) {
+                        while (System.currentTimeMillis() < endTime) {
+                            for (final int orgId : orgIds) {
+                                if (System.currentTimeMillis() >= endTime)
+                                    break;
+
+                                grid(0).compute().affinityRun(
+                                    Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                                    new Integer(orgId),
+                                    new TestAffinityRun(personsCntGetter, orgId));
+                            }
+                        }
+                    }
+                    else {
+                        while (System.currentTimeMillis() < endTime) {
+                            for (final int orgId : orgIds) {
+                                if (System.currentTimeMillis() >= endTime)
+                                    break;
+
+                                int personsCnt = grid(0).compute().affinityCall(
+                                    Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                                    new Integer(orgId),
+                                    new TestAffinityCall(personsCntGetter, orgId));
+
+                                assertEquals(PERS_AT_ORG_CNT, personsCnt);
+                            }
+                        }
+                    }
+
+                }
+            }, AFFINITY_THREADS_CNT, "affinity-run");
+        }
+        finally {
+            if (affFut != null)
+                affFut.get();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCheckReservePartitionException() throws Exception {
+        int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(0).compute().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
+                new Integer(orgId),
+                new IgniteRunnable() {
+                    @Override public void run() {
+                        // No-op.
+                    }
+                });
+
+            fail("Exception is expected");
+        }
+        catch (Exception e) {
+            assertTrue(e.getMessage()
+                .startsWith("Failed partition reservation. Partition is not primary on the
node."));
+        }
+
+        try {
+            grid(0).compute().affinityCall(
+                Arrays.asList(Organization.class.getSimpleName(), OTHER_CACHE_NAME),
+                new Integer(orgId),
+                new IgniteCallable<Object>() {
+                    @Override public Object call() throws Exception {
+                        return null;
+                    }
+                });
+
+            fail("Exception is expected");
+        }
+        catch (Exception e) {
+            assertTrue(e.getMessage()
+                .startsWith("Failed partition reservation. Partition is not primary on the
node."));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobCompletesNormally() throws Exception {
+        final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+        grid(0).compute().affinityRun(
+            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+            new Integer(orgId),
+            new IgniteRunnable() {
+                @IgniteInstanceResource
+                IgniteEx ignite;
+
+                @Override public void run() {
+                    try {
+                        checkPartitionsReservations(ignite, orgId, 1);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                        fail("Unexpected exception");
+                    }
+                }
+            });
+
+        checkPartitionsReservations(grid(1), orgId, 0);
+
+        grid(0).compute().affinityCall(
+            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+            new Integer(orgId),
+            new IgniteCallable<Object>() {
+                @IgniteInstanceResource
+                IgniteEx ignite;
+
+                @Override public Object call() {
+                    try {
+                        checkPartitionsReservations(ignite, orgId, 1);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                        fail("Unexpected exception");
+                    }
+                    return null;
+                }
+            });
+
+        checkPartitionsReservations(grid(1), orgId, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobThrowsException() throws Exception {
+        final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(0).compute().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteRunnable() {
+                    @IgniteInstanceResource
+                    IgniteEx ignite;
+
+                    @Override public void run() {
+                        try {
+                            checkPartitionsReservations(ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+
+                        }
+                        throw new RuntimeException("Test job throws exception");
+                    }
+                });
+
+            fail("Exception must be thrown");
+        }
+        catch (Exception e) {
+            checkPartitionsReservations(grid(1), orgId, 0);
+        }
+
+        try {
+            grid(0).compute().affinityCall(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteCallable<Object>() {
+                    @IgniteInstanceResource
+                    IgniteEx ignite;
+
+                    @Override public Object call() {
+                        try {
+                            checkPartitionsReservations(ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+                        throw new RuntimeException("Test job throws exception");
+                    }
+                });
+
+            fail("Exception must be thrown");
+        }
+        catch (Exception e) {
+            checkPartitionsReservations(grid(1), orgId, 0);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobThrowsError() throws Exception {
+        final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(0).compute().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteRunnable() {
+                    @IgniteInstanceResource
+                    IgniteEx ignite;
+
+                    @Override public void run() {
+                        try {
+                            checkPartitionsReservations(ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+                        throw new Error("Test job throws error");
+                    }
+                });
+
+            fail("Error must be thrown");
+        }
+        catch (Throwable e) {
+            checkPartitionsReservations(grid(1), orgId, 0);
+        }
+
+        try {
+            grid(0).compute().affinityCall(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteCallable<Object>() {
+                    @IgniteInstanceResource
+                    IgniteEx ignite;
+
+                    @Override public Object call() {
+                        try {
+                            checkPartitionsReservations(ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+                        throw new Error("Test job throws error");
+                    }
+                });
+
+            fail("Error must be thrown");
+        }
+        catch (Throwable e) {
+            checkPartitionsReservations(grid(1), orgId, 0);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobUnmarshalingFails() throws Exception {
+        final int orgId = primaryKey(grid(1).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(0).compute().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new JobFailUnmarshaling());
+            fail("Unmarshaling exception must be thrown");
+        }
+        catch (Exception e) {
+            checkPartitionsReservations(grid(1), orgId, 0);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobMasterLeave() throws Exception {
+        final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(1).compute().withAsync().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteRunnable() {
+                    @IgniteInstanceResource
+                    private Ignite ignite;
+
+                    @Override public void run() {
+                        try {
+                            checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
+                    }
+                });
+
+            stopGrid(1, true);
+
+            Thread.sleep(3000);
+
+            awaitPartitionMapExchange();
+
+            checkPartitionsReservations(grid(0), orgId, 0);
+        }
+        finally {
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+        }
+
+
+        try {
+            grid(1).compute().withAsync().affinityCall(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new IgniteCallable<Object>() {
+                    @IgniteInstanceResource
+                    private Ignite ignite;
+
+                    @Override public Object call() {
+                        try {
+                            checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
+                        return null;
+                    }
+                });
+
+            stopGrid(1, true);
+
+            Thread.sleep(3000);
+
+            awaitPartitionMapExchange();
+
+            checkPartitionsReservations(grid(0), orgId, 0);
+        }
+        finally {
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReleasePartitionJobImplementMasterLeave() throws Exception {
+        final int orgId = primaryKey(grid(0).cache(Organization.class.getSimpleName()));
+
+        try {
+            grid(1).compute().withAsync().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new RunnableWithMasterLeave() {
+                    @IgniteInstanceResource
+                    private Ignite ignite;
+
+                    @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws
IgniteException {
+                        // No-op.
+                    }
+
+                    @Override public void run() {
+                        try {
+                            checkPartitionsReservations((IgniteEx)ignite, orgId, 1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                            fail("Unexpected exception");
+                        }
+
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
+                    }
+                });
+
+            stopGrid(1, true);
+
+            Thread.sleep(3000);
+
+            awaitPartitionMapExchange();
+
+            checkPartitionsReservations(grid(0), orgId, 0);
+        }
+        finally {
+            startGrid(1);
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /** */
+    private interface PersonsCountGetter {
+        /**
+         * @param ignite Ignite.
+         * @param log Logger.
+         * @param orgId Org id.
+         * @return Count of found Person object with specified orgId
+         * @throws Exception If failed.
+         */
+        int getPersonsCount(IgniteEx ignite, IgniteLogger log, int orgId) throws Exception;
+    }
+
+    /** */
+    interface RunnableWithMasterLeave extends IgniteRunnable, ComputeJobMasterLeaveAware
{
+    }
+
+    /** */
+    private static class TestAffinityCall implements IgniteCallable<Integer> {
+        /** Persons count getter. */
+        PersonsCountGetter personsCntGetter;
+
+        /** Org id. */
+        int orgId;
+
+        /** */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        public TestAffinityCall() {
+            // No-op.
+        }
+
+        /**
+         * @param personsCntGetter Object to count Person.
+         * @param orgId Organization Id.
+         */
+        public TestAffinityCall(PersonsCountGetter personsCntGetter, int orgId) {
+            this.personsCntGetter = personsCntGetter;
+            this.orgId = orgId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            log.info("Begin call. orgId=" + orgId);
+            return personsCntGetter.getPersonsCount(ignite, log, orgId);
+        }
+    }
+
+    /** */
+    private static class TestAffinityRun implements IgniteRunnable {
+        /** Persons count getter. */
+        PersonsCountGetter personsCntGetter;
+
+        /** Org id. */
+        int orgId;
+
+        /** */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        public TestAffinityRun() {
+            // No-op.
+        }
+
+        /**
+         * @param personsCntGetter Object to count Person.
+         * @param orgId Organization Id.
+         */
+        public TestAffinityRun(PersonsCountGetter personsCntGetter, int orgId) {
+            this.personsCntGetter = personsCntGetter;
+            this.orgId = orgId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                log.info("Begin run. orgId=" + orgId);
+                int cnt = personsCntGetter.getPersonsCount(ignite, log, orgId);
+                assertEquals(PERS_AT_ORG_CNT, cnt);
+            }
+            catch (Exception e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /** */
+    static class JobFailUnmarshaling implements Externalizable, IgniteRunnable {
+        /**
+         * Default constructor (required by Externalizable).
+         */
+        public JobFailUnmarshaling() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            //No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            throw new IOException("Test job unmarshaling fails");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            fail("Must not be executed");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
new file mode 100644
index 0000000..3e9f9d6
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310.
+ */
+public class IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest
+    extends IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
new file mode 100644
index 0000000..c0b896a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiAdapter;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.IgniteSpiMultipleInstancesSupport;
+import org.apache.ignite.spi.collision.CollisionContext;
+import org.apache.ignite.spi.collision.CollisionExternalListener;
+import org.apache.ignite.spi.collision.CollisionJobContext;
+import org.apache.ignite.spi.collision.CollisionSpi;
+import org.apache.ignite.spi.collision.jobstealing.JobStealingCollisionSpi;
+import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest
+    extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
+
+    private static volatile boolean cancelAllJobs = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CollisionSpi colSpi = new AlwaysCancelCollisionSpi();
+
+        cfg.setCollisionSpi(colSpi);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionReservation() throws Exception {
+        int orgId = 0;
+        cancelAllJobs = true;
+        // Workaround for initial update job metadata.
+        try {
+            grid(0).compute().affinityRun(
+                Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                new Integer(orgId),
+                new TestRun(orgId));
+        } catch (Exception e) {
+            // No-op. Swallow exceptions on run (e.g. job canceling etc.).
+            // The test checks only correct partition release in case CollisionSpi is used.
+        }
+        // All partition must be released in spite of any exceptions during the job executions.
+        cancelAllJobs = false;
+        ClusterNode n = grid(0).context().affinity()
+                .mapKeyToNode(Organization.class.getSimpleName(), orgId);
+        checkPartitionsReservations((IgniteEx)grid(n), orgId, 0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testJobFinishing() throws Exception {
+//        fail("Affinity run / call doesn't receive response where many job rejections happen.");
+        final AtomicInteger jobNum = new AtomicInteger(0);
+
+        cancelAllJobs = true;
+
+        IgniteInternalFuture<Long> affFut = null;
+        try {
+            affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    while (System.currentTimeMillis() < endTime) {
+                        int n = 0;
+                        try {
+                            for (final int orgId : orgIds) {
+                                if (System.currentTimeMillis() >= endTime)
+                                    break;
+
+                                n = jobNum.getAndIncrement();
+
+                                log.info("+++ Job submitted " + n);
+                                grid(0).compute().affinityRun(
+                                    Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                                    new Integer(orgId),
+                                    new TestRun(n));
+                            }
+                        }
+                        catch (Exception e) {
+                            log.info("+++ Job failed " + n + " " + e.toString());
+                            // No-op. Swallow exceptions on run (e.g. job canceling etc.).
+                        }
+                    }
+
+                }
+            }, AFFINITY_THREADS_CNT, "affinity-run");
+        }
+        finally {
+            if (affFut != null)
+                affFut.get();
+
+            stopRestartThread.set(true);
+
+            cancelAllJobs = false;
+
+            // Should not be timed out.
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestRun implements IgniteRunnable {
+        private int jobNum;
+
+        /** Ignite Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /**
+         *
+         */
+        public TestRun() {
+
+        }
+
+        /**
+         * @param jobNum Job number.
+         */
+        public TestRun(int jobNum) {
+            this.jobNum = jobNum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            // No-op.
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    @IgniteSpiMultipleInstancesSupport(true)
+    public static class AlwaysCancelCollisionSpi extends IgniteSpiAdapter implements CollisionSpi
{
+        /** Grid logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            Collection<CollisionJobContext> waitJobs = ctx.waitingJobs();
+            if (cancelAllJobs) {
+                for (CollisionJobContext job : waitJobs)
+                    job.cancel();
+            } else {
+                for (CollisionJobContext job : waitJobs)
+                    job.activate();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(String gridName) throws IgniteSpiException {
+            // Start SPI start stopwatch.
+            startStopwatch();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setExternalCollisionListener(CollisionExternalListener lsnr)
{
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
new file mode 100644
index 0000000..ef00fc3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheAffinityRunTestSuite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest;
+
+/**
+ * Compute and Cache tests for affinityRun/Call. These tests is extracted into separate suite
+ * because ones take a lot of time.
+ */
+public class IgniteCacheAffinityRunTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Compute and Cache Affinity Run Test Suite");
+
+        suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunTest.class);
+        suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunWithCollisionSpiTest.class);
+        suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.class);
+        suite.addTestSuite(IgniteCacheLockPartitionOnAffinityRunTxCacheOpTest.class);
+
+        return suite;
+    }
+}
\ No newline at end of file


Mime
View raw message