ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [50/50] [abbrv] ignite git commit: IGNITE-2004 Added tests.
Date Tue, 22 Mar 2016 16:50:26 GMT
IGNITE-2004 Added tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3192b7fa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3192b7fa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3192b7fa

Branch: refs/heads/ignite-2004
Commit: 3192b7fa242e5dc4e18dc07f00655f26db2004bf
Parents: 68c94bc
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Mar 22 19:49:07 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Mar 22 19:49:07 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java |  12 +-
 .../continuous/GridContinuousProcessor.java     |   1 -
 ...eContinuousQueryAsyncFilterListenerTest.java | 324 ++++++++++++++-----
 .../IgniteCacheQuerySelfTestSuite.java          |   1 -
 4 files changed, 255 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7c3e128..5cd7b2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -1313,7 +1313,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         private CacheEntryEventFilter filter;
 
         /** */
-        private IgniteInternalFuture<Boolean> notify;
+        private IgniteInternalFuture<Boolean> notifyFut;
 
         /** */
         private final GridCacheContext<K, V> cctx;
@@ -1349,7 +1349,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param primary Primary flag.
          * @param cctx Cache context.
          * @param filter Filter.
-         * @param notify
+         * @param notifyFut Notify future.
          * @param evt Event.
          * @param cache Cache.
          */
@@ -1362,7 +1362,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             boolean primary,
             GridCacheContext<K, V> cctx,
             CacheEntryEventFilter filter,
-            IgniteInternalFuture<Boolean> notify,
+            IgniteInternalFuture<Boolean> notifyFut,
             CacheContinuousQueryEvent<K, V> evt,
             IgniteCache cache) {
             this.taskName = taskName;
@@ -1374,7 +1374,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             this.primary = primary;
             this.cctx = cctx;
             this.filter = filter;
-            this.notify = notify;
+            this.notifyFut = notifyFut;
             this.evt = evt;
             this.cache = cache;
         }
@@ -1383,7 +1383,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         @Override public void run() {
             boolean notify;
 
-            if (evt.getFilterFuture() == null) {
+            if (notifyFut == null) {
                 notify = !evt.entry().isFiltered();
 
                 if (notify && filter != null) {
@@ -1397,7 +1397,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
             else {
                 try {
-                    notify = evt.getFilterFuture().get();
+                    notify = notifyFut.get();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(cctx.logger(CacheContinuousQueryHandler.class), "CacheEntryEventFilter
failed.", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd30822..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -76,7 +76,6 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
index 677845f..8eab9d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import java.io.Serializable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
@@ -44,6 +45,7 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 
@@ -57,6 +59,8 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -82,6 +86,11 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
         cfg.setClientMode(client);
 
+        MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi();
+        storeSpi.setExpireCount(1000);
+
+        cfg.setEventStorageSpi(storeSpi);
+
         return cfg;
     }
 
@@ -103,161 +112,244 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
         super.afterTestsStopped();
     }
 
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST LISTENER.
+    ///
+
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTx() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTxOffHeap() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerTxOffHeapValues() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomic() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicatedAtomic() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception
{
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicOffHeap() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListener() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInListenerReplicated() throws Exception {
-        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
     }
 
+    ///
+    /// ASYNC FILTER AND LISTENER. TEST FILTER.
+    ///
+
     /**
-     * START START START
-     *
-     *
-     *
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTx() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeap() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterTxOffHeapValues() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomic() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicatedAtomic() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeap() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilter() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testNonDeadLockInFilterReplicated() throws Exception {
-        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED));
+        testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED),
true, true);
+    }
+
+    ///
+    /// ASYNC LISTENER. TEST LISTENER.
+    ///
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED),
false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception {
+        testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED),
false, true);
     }
 
     /**
+     * @param ccfg Cache configuration.
+     * @param asyncFilter Async filter.
+     * @param asyncListener Async listener.
      * @throws Exception If failed.
      */
-    public void testNonDeadLockInListener(CacheConfiguration ccfg) throws Exception {
+    public void testNonDeadLockInListener(CacheConfiguration ccfg,
+        final boolean asyncFilter,
+        boolean asyncListener) throws Exception {
         final IgniteCache cache = ignite(0).createCache(ccfg);
 
         try {
@@ -273,8 +365,22 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
                 final CountDownLatch latch = new CountDownLatch(1);
 
-                conQry.setLocalListener(new CacheInvokeListener(new IgniteBiInClosure<Ignite,
-                    CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>()
{
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ?
extends QueryTestValue>> fltrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncFilter) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
+
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("contQry-"));
+                            }
+                        }
+                    };
+
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ?
extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
? extends QueryTestValue>>() {
                     @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
                         ? extends QueryTestValue> e) {
                         IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
@@ -288,7 +394,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
                         try {
                             if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()
== TRANSACTIONAL)
-                                tx = ignite.transactions().txStart();
+                                tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
 
                             assertEquals(val, val0);
 
@@ -309,7 +415,17 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
                                 tx.close();
                         }
                     }
-                }));
+                };
+
+                if (asyncListener)
+                    conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr));
+                else
+                    conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
+
+                if (asyncFilter)
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr)));
+                else
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
 
                 try (QueryCursor qry = cache.query(conQry)) {
                     cache.put(key, val0);
@@ -328,9 +444,14 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     }
 
     /**
+     * @param ccfg Cache configuration.
+     * @param asyncFilter Async filter.
+     * @param asyncListener Async listener.
      * @throws Exception If failed.
      */
-    public void testNonDeadLockInFilter(CacheConfiguration ccfg) throws Exception {
+    public void testNonDeadLockInFilter(CacheConfiguration ccfg,
+        final boolean asyncFilter,
+        final boolean asyncListener) throws Exception {
         final IgniteCache cache = ignite(0).createCache(ccfg);
 
         try {
@@ -346,64 +467,87 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
 
                 final CountDownLatch latch = new CountDownLatch(1);
 
-                conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ?
extends QueryTestValue>> fltrClsr =
                     new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
? extends QueryTestValue>>() {
-                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
-                            ? extends QueryTestValue> e) {
-                            assertFalse(Thread.currentThread().getName().contains("sys-"));
+                    @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
+                        ? extends QueryTestValue> e) {
+                        if (asyncFilter) {
+                            assertFalse("Failed: " + Thread.currentThread().getName(),
+                                Thread.currentThread().getName().contains("sys-"));
+
                             assertTrue("Failed: " + Thread.currentThread().getName(),
                                 Thread.currentThread().getName().contains("contQry-"));
+                        }
 
-                            IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
 
-                            QueryTestValue val = e.getValue();
 
-                            if (val == null || !val.equals(new QueryTestValue(1)))
-                                return;
+                        IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName());
 
-                            Transaction tx = null;
+                        QueryTestValue val = e.getValue();
 
-                            try {
-                                if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode()
-                                    == TRANSACTIONAL)
-                                    tx = ignite.transactions().txStart();
+                        if (val == null || !val.equals(new QueryTestValue(1)))
+                            return;
 
-                                assertEquals(val, val0);
+                        Transaction tx = null;
 
-                                cache0.put(key, newVal);
+                        try {
+                            if (cache0.getConfiguration(CacheConfiguration.class)
+                                .getAtomicityMode() == TRANSACTIONAL)
+                                tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
 
-                                if (tx != null)
-                                    tx.commit();
+                            assertEquals(val, val0);
 
-                                latch.countDown();
-                            }
-                            catch (Exception exp) {
-                                log.error("Failed: ", exp);
+                            cache0.put(key, newVal);
 
-                                throw new IgniteException(exp);
-                            }
-                            finally {
-                                if (tx != null)
-                                    tx.close();
-                            }
+                            if (tx != null)
+                                tx.commit();
+
+                            latch.countDown();
                         }
-                    })
-                ));
+                        catch (Exception exp) {
+                            log.error("Failed: ", exp);
 
-                conQry.setLocalListener(new CacheInvokeListener(new IgniteBiInClosure<Ignite,
-                    CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>()
{
-                    @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
-                        ? extends QueryTestValue> e) {
-                        QueryTestValue val = e.getValue();
+                            throw new IgniteException(exp);
+                        }
+                        finally {
+                            if (tx != null)
+                                tx.close();
+                        }
+                    }
+                };
 
-                        if (val == null || !val.equals(new QueryTestValue(1)))
-                            return;
+                IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ?
extends QueryTestValue>> lsnrClsr =
+                    new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey,
? extends QueryTestValue>>() {
+                        @Override public void apply(Ignite ignite, CacheEntryEvent<? extends
QueryTestKey,
+                            ? extends QueryTestValue> e) {
+                            if (asyncListener) {
+                                assertFalse("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("sys-"));
 
-                        assertEquals(val, val0);
+                                assertTrue("Failed: " + Thread.currentThread().getName(),
+                                    Thread.currentThread().getName().contains("contQry-"));
+                            }
 
-                        latch.countDown();
-                    }
-                }));
+                            QueryTestValue val = e.getValue();
+
+                            if (val == null || !val.equals(new QueryTestValue(1)))
+                                return;
+
+                            assertEquals(val, val0);
+
+                            latch.countDown();
+                        }
+                    };
+
+                if (asyncFilter)
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(fltrClsr)));
+                else
+                    conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr)));
+
+                if (asyncListener)
+                    conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr));
+                else
+                    conQry.setLocalListener(new CacheInvokeListener(lsnrClsr));
 
                 try (QueryCursor qry = cache.query(conQry)) {
                     cache.put(key, val0);
@@ -438,11 +582,29 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
         throw new IgniteException("Failed to found primary key.");
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.SECONDS.toMillis(10);
+    }
+
+    /**
+     *
+     */
+    private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter implements
CacheAsyncCallback {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheTestRemoteFilterAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends
QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
     /**
      *
      */
     private static class CacheTestRemoteFilter implements
-        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, CacheAsyncCallback
{
+        CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> {
         /** */
         @IgniteInstanceResource
         private Ignite ignite;
@@ -470,8 +632,20 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
     /**
      *
      */
-    private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey,
QueryTestValue>,
-        CacheAsyncCallback {
+    private static class CacheInvokeListenerAsync extends CacheInvokeListener implements
CacheAsyncCallback {
+        /**
+         * @param clsr Closure.
+         */
+        public CacheInvokeListenerAsync(
+            IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends
QueryTestValue>> clsr) {
+            super(clsr);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey,
QueryTestValue> {
         @IgniteInstanceResource
         private Ignite ignite;
 
@@ -509,7 +683,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr
         CacheMemoryMode memoryMode) {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
 
-        ccfg.setName("test-cache");
+        ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode +
"-" + backups);
         ccfg.setAtomicityMode(atomicityMode);
         ccfg.setCacheMode(cacheMode);
         ccfg.setMemoryMode(memoryMode);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3192b7fa/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 6da8cac..5d52cfc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -230,7 +230,6 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
-        suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
         suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class);
         suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class);


Mime
View raw message