ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: # sprint-2 ignore exception from event listener filter
Date Tue, 17 Feb 2015 08:10:04 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/sprint-2 a4101a5f5 -> 68a3b77b7


# sprint-2 ignore exception from event listener filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/68a3b77b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/68a3b77b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/68a3b77b

Branch: refs/heads/sprint-2
Commit: 68a3b77b7492cdf7a098ddd66acdda78ebb75c00
Parents: a4101a5
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 17 11:09:50 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 17 11:09:50 2015 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  16 ++-
 .../IgniteCacheEntryListenerAbstractTest.java   | 106 +++++++++++++++++++
 2 files changed, 120 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index fc9811e..8480211 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.resources.*;
 import org.jdk8.backport.*;
 
 import javax.cache.*;
@@ -651,7 +652,7 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
                     }
                 }
                 catch (Exception e) {
-                    U.error(log, "CacheEntryCreatedListener failed: " + e);
+                    U.error(log, "CacheEntryListener failed: " + e);
                 }
             }
         }
@@ -683,6 +684,10 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
         /** */
         private byte types;
 
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
         /**
          * For {@link Externalizable}.
          */
@@ -703,7 +708,14 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K
 
         /** {@inheritDoc} */
         @Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V>
evt) {
-            return (types & flag(evt.getEventType())) != 0 && (impl == null ||
impl.evaluate(evt));
+            try {
+                return (types & flag(evt.getEventType())) != 0 && (impl == null
|| impl.evaluate(evt));
+            }
+            catch (Exception e) {
+                U.error(log, "CacheEntryEventFilter failed: " + e);
+
+                return true;
+            }
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/68a3b77b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 0c2cb0c..66892ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -87,6 +87,69 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      * @throws Exception If failed.
      */
+    public void testExceptionIgnored() throws Exception {
+        CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
+                    return new ExceptionListener();
+                }
+            },
+            null,
+            false,
+            false
+        );
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check listener exceptions are ignored [key=" + key + ']');
+
+                cache.put(key, key);
+
+                cache.remove(key);
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+
+        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
+            new Factory<CacheEntryListener<Integer, Integer>>() {
+                @Override public CacheEntryListener<Integer, Integer> create() {
+                    return new CreateUpdateRemoveExpireListener();
+                }
+            },
+            new Factory<CacheEntryEventFilter<? super Integer, ? super Integer>>()
{
+                @Override public CacheEntryEventFilter<? super Integer, ? super Integer>
create() {
+                    return new ExceptionFilter();
+                }
+            },
+            false,
+            false
+        );
+
+        cache.registerCacheEntryListener(lsnrCfg);
+
+        try {
+            for (Integer key : keys()) {
+                log.info("Check filter exceptions are ignored [key=" + key + ']');
+
+                cache.put(key, key);
+
+                cache.remove(key);
+            }
+        }
+        finally {
+            cache.deregisterCacheEntryListener(lsnrCfg);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testNoOldValue() throws Exception {
         CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
             new Factory<CacheEntryListener<Integer, Integer>>() {
@@ -960,6 +1023,49 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     /**
      *
      */
+    static class ExceptionFilter implements CacheEntryEventFilter<Integer, Integer>
{
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
Integer> evt) {
+            throw new RuntimeException("Test filter error.");
+        }
+    }
+
+    /**
+     *
+     */
+    static class ExceptionListener extends CreateUpdateListener
+        implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer,
Integer> {
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer,
? extends Integer>> evts) {
+            error();
+        }
+
+        /**
+         * Throws exception.
+         */
+        private void error() {
+            throw new RuntimeException("Test listener error.");
+        }
+    }
+
+    /**
+     *
+     */
     protected static class ToStringProcessor implements EntryProcessor<Integer, Integer,
String> {
         /** {@inheritDoc} */
         @Override public String process(MutableEntry<Integer, Integer> e, Object...
arguments)


Mime
View raw message