ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2004 Fixed review notes.
Date Wed, 13 Apr 2016 10:57:51 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2004 07d62cea1 -> 39fad6505


IGNITE-2004 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39fad650
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39fad650
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39fad650

Branch: refs/heads/ignite-2004
Commit: 39fad650555907fc5c1f44d4d5bcd4682b8deab7
Parents: 07d62ce
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Wed Apr 13 13:57:12 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Apr 13 13:57:12 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 39 ++++++--
 .../thread/IgniteStripedThreadPoolExecutor.java |  4 +
 ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 49 ++++++++++
 ...sQueryAsyncFailoverTxReplicatedSelfTest.java | 37 ++++++++
 ...eContinuousQueryAsyncFailoverTxSelfTest.java | 44 +++++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java | 63 +++++++++++--
 ...ontinuousQueryOperationFromCallbackTest.java | 78 +++++++++++----
 .../CacheContinuousQueryOrderingEventTest.java  | 99 ++++++++++++++++----
 .../IgniteCacheQuerySelfTestSuite4.java         |  7 ++
 9 files changed, 370 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ed39a17..dc6d20e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -579,18 +579,43 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         assert objs != null;
         assert ctx != null;
 
-        final Collection<CacheContinuousQueryEntry> ents = (Collection<CacheContinuousQueryEntry>)objs;
+        final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
 
-        if (!ents.isEmpty()) {
+        if (!entries.isEmpty()) {
             if (asyncCallback) {
-                ctx.asyncCallbackPool().execute(new Runnable() {
-                    @Override public void run() {
-                        notifyCallback0(nodeId, ctx, ents);
+                if (entries.size() != 1) {
+                    Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart
= new HashMap<>();
+
+                    for (CacheContinuousQueryEntry e : entries) {
+                        Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(e.partition());
+
+                        if (ents == null) {
+                            ents = new ArrayList<>(entries.size());
+
+                            entriesByPart.put(e.partition(), ents);
+                        }
+
+                        ents.add(e);
+                    }
+
+                    for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>>
e : entriesByPart.entrySet()) {
+                        ctx.asyncCallbackPool().execute(new Runnable() {
+                            @Override public void run() {
+                                notifyCallback0(nodeId, ctx, e.getValue());
+                            }
+                        }, e.getKey());
                     }
-                });
+                }
+                else {
+                    ctx.asyncCallbackPool().execute(new Runnable() {
+                        @Override public void run() {
+                            notifyCallback0(nodeId, ctx, entries);
+                        }
+                    }, entries.iterator().next().partition());
+                }
             }
             else
-                notifyCallback0(nodeId, ctx, ents);
+                notifyCallback0(nodeId, ctx, entries);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 44ea823..0dc5588 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -21,6 +21,8 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,7 +31,9 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jsr166.ThreadLocalRandom8;
 
 /**
  * An {@link ExecutorService} that executes submitted tasks using pooled grid threads.

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
new file mode 100644
index 0000000..be7a115
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
new file mode 100644
index 0000000..4460498
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
new file mode 100644
index 0000000..8f0bd0e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean asyncCallback() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 4226537..083367c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -99,7 +100,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -167,6 +168,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
     }
 
     /**
+     * @return Async callback flag.
+     */
+    protected boolean asyncCallback() {
+        return false;
+    }
+
+    /**
      * @return Near cache configuration.
      */
     protected NearCacheConfiguration nearCacheConfiguration() {
@@ -476,7 +484,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
         for (int j = 0; j < 50; ++j) {
             ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-            final CacheEventListener3 lsnr = new CacheEventListener3();
+            final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3()
+                : new CacheEventListener3();
 
             qry.setLocalListener(lsnr);
 
@@ -560,7 +569,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final CacheEventListener3 lsnr = new CacheEventListener3();
+        final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3()
: new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -721,7 +730,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        final CacheEventListener3 lsnr = new CacheEventListener3();
+        final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3()
: new CacheEventListener3();
 
         qry.setLocalListener(lsnr);
 
@@ -841,7 +850,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         Affinity<Object> aff = qryClient.affinity(null);
 
-        CacheEventListener1 lsnr = new CacheEventListener1(false);
+        CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false)
+            : new CacheEventListener1(false);
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -1545,7 +1555,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         qry.setLocalListener(lsnr);
 
-        qry.setRemoteFilter(new CacheEventFilter());
+        qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter());
 
         QueryCursor<?> cur = qryClnCache.query(qry);
 
@@ -1639,7 +1649,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
                     newQry.setLocalListener(dinLsnr);
 
-                    newQry.setRemoteFilter(new CacheEventFilter());
+                    newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter()
: new CacheEventFilter());
 
                     dinQry = qryClnCache.query(newQry);
 
@@ -1786,7 +1796,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null);
 
-        final CacheEventListener2 lsnr = new CacheEventListener2();
+        final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2()
: new CacheEventListener2();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
@@ -2144,6 +2154,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncListener1 extends CacheEventListener1 {
+        /**
+         * @param saveAll Save all events flag.
+         */
+        CacheEventAsyncListener1(boolean saveAll) {
+            super(saveAll);
+        }
+    }
+
+    /**
+     *
+     */
     private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object,
Object> {
         /** */
         private volatile CountDownLatch latch;
@@ -2208,6 +2231,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncListener2 extends CacheEventListener2 {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object,
Object> {
         /** */
         @LoggerResource
@@ -2275,6 +2306,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    public static class CacheEventAsyncListener3 extends CacheEventListener3 {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object,
Object>,
         CacheEntryEventSerializableFilter<Object, Object> {
         /** Keys. */
@@ -2303,6 +2342,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
     /**
      *
      */
+    @IgniteAsyncCallback
+    private static class CacheEventAsyncFilter extends CacheEventFilter {
+        // No-op.
+    }
+
+    /**
+     *
+     */
     public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object,
Object> {
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
index d301036..2ead28a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java
@@ -39,6 +39,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -64,6 +65,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
@@ -136,7 +138,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testAtomicTwoBackups() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC, FULL_SYNC);
 
         doTest(ccfg, true);
     }
@@ -145,7 +147,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testAtomicReplicatedFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC, PRIMARY_SYNC);
 
         doTest(ccfg, false);
     }
@@ -154,7 +165,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testAtomicTwoBackupsFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicTwoBackupsFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC, PRIMARY_SYNC);
 
         doTest(ccfg, false);
     }
@@ -163,7 +183,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testAtomicWithoutBackupsFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
ATOMIC);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
ATOMIC, FULL_SYNC);
 
         doTest(ccfg, false);
     }
@@ -172,7 +192,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testTxTwoBackupsFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxTwoBackupsFilterPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL, PRIMARY_SYNC);
 
         doTest(ccfg, false);
     }
@@ -181,7 +210,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testTxReplicatedFilter() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
TRANSACTIONAL);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
TRANSACTIONAL, FULL_SYNC);
 
         doTest(ccfg, false);
     }
@@ -190,7 +219,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testTxTwoBackup() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL, FULL_SYNC);
 
         doTest(ccfg, true);
     }
@@ -199,7 +228,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @throws Exception If failed.
      */
     public void testTxReplicated() throws Exception {
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2,
TRANSACTIONAL);
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2,
TRANSACTIONAL, FULL_SYNC);
+
+        doTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedPrimary() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2,
TRANSACTIONAL, PRIMARY_SYNC);
 
         doTest(ccfg, true);
     }
@@ -223,7 +261,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
 
             final AtomicInteger cbCntr = new AtomicInteger(0);
 
-            final int threadCnt = 10;
+            final int threadCnt = IgniteConfiguration.DFLT_SYSTEM_CORE_THREAD_CNT * 2;
 
             for (int idx = 0; idx < NODES; idx++) {
                 Set<T2<QueryTestKey, QueryTestValue>> evts = Collections.
@@ -306,12 +344,14 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
             if (fromLsnr) {
                 final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK;
 
+                boolean condition = GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return cbCntr.get() >= expCnt;
+                    }
+                }, TimeUnit.SECONDS.toMillis(60));
+
                 assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get()
+ "]",
-                    GridTestUtils.waitForCondition(new PA() {
-                        @Override public boolean apply() {
-                            return cbCntr.get() >= expCnt;
-                        }
-                }, TimeUnit.SECONDS.toMillis(60)));
+                    condition);
 
                 assertEquals(expCnt, cbCntr.get());
 
@@ -326,7 +366,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
                     @Override public boolean apply() {
                         return filterCbCntr.get() >= expInvkCnt;
                     }
-                }, TimeUnit.SECONDS.toMillis(20));
+                }, TimeUnit.SECONDS.toMillis(60));
 
                 assertEquals(expInvkCnt, filterCbCntr.get());
 
@@ -511,18 +551,20 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs
      * @param cacheMode Cache mode.
      * @param backups Number of backups.
      * @param atomicityMode Cache atomicity mode.
+     * @param writeMode Write sync mode.
      * @return Cache configuration.
      */
     protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
-        CacheAtomicityMode atomicityMode) {
+        CacheAtomicityMode atomicityMode,
+        CacheWriteSynchronizationMode writeMode) {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
 
-        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + cacheMode +
"-" + backups);
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + writeMode +
"-" + backups);
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(cacheMode);
-        ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+        ccfg.setWriteSynchronizationMode(writeMode);
         ccfg.setAtomicWriteOrderMode(PRIMARY);
 
         if (cacheMode == PARTITIONED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
index 7efca53..c7d0b6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -66,6 +67,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
 import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
@@ -139,7 +141,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOnheapTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -149,7 +151,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOffheapTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            OFFHEAP_TIERED);
+            OFFHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -159,7 +161,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOffheapValuesTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            OFFHEAP_VALUES);
+            OFFHEAP_VALUES, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -169,7 +171,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicReplicatedOffheap() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
-            OFFHEAP_TIERED);
+            OFFHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -179,7 +181,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testTxOnheapTwoBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -189,7 +191,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testTxOnheapWithoutBackup() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapWithoutBackupFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, false);
     }
@@ -201,7 +213,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOnheapTwoBackupAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOnheapTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            ONHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -211,7 +233,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOffheapTwoBackupAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            OFFHEAP_TIERED);
+            OFFHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -221,7 +253,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOffheapValuesTwoBackupAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
-            OFFHEAP_VALUES);
+            OFFHEAP_VALUES, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapValuesTwoBackupAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
ATOMIC,
+            OFFHEAP_VALUES, FULL_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -231,7 +273,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicReplicatedAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
+            ONHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -241,7 +293,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicReplicatedOffheapAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0,
ATOMIC,
-            OFFHEAP_TIERED);
+            OFFHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -251,7 +303,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testAtomicOnheapWithoutBackupAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
ATOMIC,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -261,7 +313,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testTxOnheapTwoBackupAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2,
TRANSACTIONAL,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -271,7 +323,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      */
     public void testTxOnheapAsync() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
-            ONHEAP_TIERED);
+            ONHEAP_TIERED, PRIMARY_SYNC);
+
+        doOrderingTest(ccfg, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxOnheapAsyncFullSync() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0,
TRANSACTIONAL,
+            ONHEAP_TIERED, FULL_SYNC);
 
         doOrderingTest(ccfg, true);
     }
@@ -540,20 +602,23 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes
      * @param backups Number of backups.
      * @param atomicityMode Cache atomicity mode.
      * @param memoryMode Cache memory mode.
+     * @param writeMode Cache write mode.
      * @return Cache configuration.
      */
     protected CacheConfiguration<Object, Object> cacheConfiguration(
         CacheMode cacheMode,
         int backups,
         CacheAtomicityMode atomicityMode,
-        CacheMemoryMode memoryMode) {
+        CacheMemoryMode memoryMode,
+        CacheWriteSynchronizationMode writeMode) {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
 
-        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode +
"-" + backups);
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode +
"-" + memoryMode + "-"
+            + backups);
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(cacheMode);
         ccfg.setMemoryMode(memoryMode);
-        ccfg.setWriteSynchronizationMode(PRIMARY_SYNC);
+        ccfg.setWriteSynchronizationMode(writeMode);
         ccfg.setAtomicWriteOrderMode(PRIMARY);
 
         if (cacheMode == PARTITIONED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
index fa4e642..c4fcdac 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
@@ -44,6 +47,10 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
 
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class);
+
         return suite;
     }
 }


Mime
View raw message