ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [02/50] [abbrv] ignite git commit: IGNITE-2079 GridCacheIoManager eats exception trail if it falls into the directed case merger from ignite-2079-2
Date Thu, 24 Nov 2016 13:24:12 GMT
IGNITE-2079 GridCacheIoManager eats exception trail if it falls into the directed case
merger from ignite-2079-2

# Conflicts:
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ddb8be1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ddb8be1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ddb8be1

Branch: refs/heads/ignite-4242
Commit: 9ddb8be1243df8e489f7ebc716d315415775439a
Parents: 4474046
Author: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Authored: Thu Oct 27 17:52:22 2016 +0300
Committer: Dmitriy Govorukhin <dgovorukhin@gridgain.com>
Committed: Thu Oct 27 17:52:22 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     |   6 +
 .../ignite/events/UnhandledExceptionEvent.java  |  61 ++++
 .../processors/cache/GridCacheIoManager.java    |  70 +++--
 .../cache/query/GridCacheQueryManager.java      |  10 +
 .../query/GridCacheQueryMetricsAdapter.java     |   9 +-
 .../cache/query/GridCacheQueryResponse.java     |   2 +-
 .../continuous/CacheContinuousQueryHandler.java |  50 +--
 ...2pUnmarshallingContinuousQueryErrorTest.java | 302 +++++++++++++++++++
 ...niteCacheP2pUnmarshallingErrorTestSuite.java |   6 +-
 9 files changed, 455 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 103dbd4..7778f67 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -859,6 +859,12 @@ public interface EventType {
     public static final int EVT_IGFS_FILE_PURGED = 127;
 
     /**
+     * Built-in event type: event for unhandled exception.
+     *
+     */
+    public static final int EVT_UNHANDLED_EXCEPTION = 128;
+
+    /**
      * All checkpoint events. This array can be directly passed into
      * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
      * subscribe to all checkpoint events.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
new file mode 100644
index 0000000..cb6cd85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Cache fail event.
+ */
+public class UnhandledExceptionEvent extends EventAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Exception ex;
+
+    /**
+     * Default constructor.
+     */
+    public UnhandledExceptionEvent() {
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     * @param ex Exception.
+     * @param type Type.
+     */
+    public UnhandledExceptionEvent(ClusterNode node, String msg, Exception ex, int type)
{
+        super(node, msg, type);
+        this.ex = ex;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "msg=" + message() + ", type=" + type() + "ex=" + ex;
+    }
+
+    /**
+     *
+     * @return inner exception
+     */
+    public Exception getException() {
+        return ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 78dddd3..5d7cb00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,50 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.UnhandledExceptionEvent;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -79,6 +55,12 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.events.EventType.EVT_UNHANDLED_EXCEPTION;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 
 /**
@@ -693,6 +675,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
 
             break;
 
+            case 59:
+                // No additional actions required, just skipping default switch section,
+                // since UnhandledException already registered.
+                break;
+
             case 114: {
                 processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
             }
@@ -737,13 +724,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
 
             break;
 
-            default:
-                throw new IgniteCheckedException("Failed to send response to node. Unsupported
direct type [message="
-                    + msg + "]", msg.classError());
+            default:{
+                String shortMsg = "Failed to send response to node. Unsupported direct type
[message=" + msg + "]";
+
+                IgniteCheckedException e = new IgniteCheckedException(shortMsg, msg.classError());
+
+                registerUnhandledException(ctx, shortMsg, e);
+            }
         }
     }
 
     /**
+     * @param ctx Grid cache context.
+     * @param shortMsg Short message.
+     * @param ex Original Exception.
+     */
+    public static void registerUnhandledException(GridCacheContext ctx, String shortMsg,
IgniteCheckedException ex) {
+        GridKernalContext kctx = ctx.kernalContext();
+
+        kctx.exceptionRegistry().onException(shortMsg, ex);
+
+        ClusterNode node = ctx.discovery().localNode();
+
+        UnhandledExceptionEvent evt = new UnhandledExceptionEvent(node, shortMsg, ex, EVT_UNHANDLED_EXCEPTION);
+
+        kctx.event().record(evt);
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Message.
      * @param c Closure.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 7bd1a51..97e59c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -326,6 +326,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * Increment fails counter.
+     */
+    public void onUnhandledException() {
+        final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+
+        if (statsEnabled)
+            metrics.incrementOnFails();
+    }
+
+    /**
      * Processes cache query request.
      *
      * @param sndId Sender node id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
index e70ea9f..d25b7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java
@@ -172,4 +172,11 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl
     @Override public String toString() {
         return S.toString(GridCacheQueryMetricsAdapter.class, this);
     }
-}
+
+    /**
+     * Increment fails counter.
+     */
+    public void incrementOnFails() {
+        fails.increment();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 8492c38..2b86efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -357,6 +357,6 @@ public class GridCacheQueryResponse extends GridCacheMessage implements
GridCach
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridCacheQueryResponse.class, this);
+        return S.toString(GridCacheQueryResponse.class, this, super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 304d031..4c91ea7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -17,28 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryEventFilter;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -61,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
@@ -82,8 +61,22 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheIoManager.registerUnhandledException;
 
 /**
  * Continuous query handler.
@@ -688,8 +681,17 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             catch (IgniteCheckedException ex) {
                 if (ignoreClsNotFound)
                     assert internal;
-                else
-                    U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
+                else {
+                    String shortMsg = "Failed to unmarshal entry.";
+
+                    U.error(ctx.log(getClass()), shortMsg, ex);
+
+                    GridCacheQueryManager qryMgr = cctx.queries();
+
+                    qryMgr.onUnhandledException();
+
+                    registerUnhandledException(cctx, shortMsg, ex);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
new file mode 100644
index 0000000..82f5f09
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.events.UnhandledExceptionEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.thread.IgniteThread;
+
+/**
+ * Checks behavior on exception while unmarshalling key for continuous query.
+ */
+public class IgniteCacheP2pUnmarshallingContinuousQueryErrorTest extends IgniteCacheP2pUnmarshallingErrorTest
{
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** Used inside InitialQuery listener. */
+    private static final CountDownLatch latch = new CountDownLatch(1);
+
+    /** Node where unmarshalling fails with exceptions. */
+    private static volatile String failNode;
+
+    /** Used to count UnhandledExceptionEvents at client node. */
+    private static final AtomicInteger cnt = new AtomicInteger();
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception
{
+        CacheConfiguration cacheCfg = super.cacheConfiguration(gridName);
+
+        cacheCfg.setStatisticsEnabled(true);
+
+        return cacheCfg;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void testResponseMessageOnUnmarshallingFailed() throws Exception {
+        IgniteEx client = grid(0);
+        IgniteEx node1 = grid(1);
+        IgniteEx node2 = grid(2);
+
+        assert client.configuration().isClientMode() &&
+            !node1.configuration().isClientMode() &&
+            !node2.configuration().isClientMode();
+
+        failNode = client.name();
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                UnhandledExceptionEvent uex = (UnhandledExceptionEvent)evt;
+
+                assertTrue(X.getFullStackTrace(uex.getException()).
+                    contains("IOException: Class can not be unmarshalled"));
+
+                cnt.incrementAndGet();
+
+                return true;
+            }
+        }, EventType.EVT_UNHANDLED_EXCEPTION);
+
+        node1.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                fail("This line should newer calls.");
+
+                return true;
+            }
+        }, EventType.EVT_UNHANDLED_EXCEPTION);
+
+        ContinuousQuery<TestKey, String> qry = new ContinuousQuery<>();
+
+        qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<TestKey, String>()
{
+            @Override public boolean apply(TestKey key, String val) {
+                latch.countDown(); // Gives guarantee query initialized.
+
+                return true;
+            }
+        }));
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<TestKey, String>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends TestKey,
? extends String>> evts) {
+                fail("This line should newer calls.");
+            }
+        });
+
+        validate(
+            0,//execs
+            0,//evts
+            0,//fails
+            client,
+            node1,
+            node2);
+
+        // Put element before creating QueryCursor.
+        putPrimary(node1);
+
+        try (QueryCursor<Cache.Entry<TestKey, String>> cur = client.cache(null).query(qry))
{
+            latch.await();
+
+            validate(
+                1,//execs
+                0,//evts
+                0,//fails
+                client,
+                node1,
+                node2);
+
+            putPrimary(node1);
+
+            validate(
+                1,//execs
+                1,//evts
+                1,//fails
+                client,
+                node1,
+                node2);
+
+            putPrimary(node2);
+
+            validate(
+                1,//execs
+                2,//evts
+                2,//fails
+                client,
+                node1,
+                node2);
+        }
+    }
+
+    /**
+     * @param ignite Ignite.
+     */
+    private void putPrimary(IgniteEx ignite) {
+        IgniteCache<TestKey, Object> cache = ignite.cache(null);
+
+        cache.put(generateNodeKeys(ignite, cache), "value");
+    }
+
+    /**
+     * @param execs Executions.
+     * @param evts Events.
+     * @param failsNum Fails number.
+     * @param client Client.
+     * @param node1 Node 1.
+     * @param node2 Node 2.
+     */
+    private void validate(final int execs, final int evts, final int failsNum, final IgniteEx
client, IgniteEx node1,
+        IgniteEx node2) throws Exception {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return client.cache(null).queryMetrics().fails() == failsNum;
+            }
+        }, 5_000));
+
+        assertEquals(evts, cnt.intValue());
+
+        validateCacheQueryMetrics(client, execs, failsNum);
+        validateCacheQueryMetrics(node1, 0, 0);
+        validateCacheQueryMetrics(node2, 0, 0);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param executions Executions.
+     * @param fails Fails.
+     */
+    private void validateCacheQueryMetrics(IgniteEx ignite, int executions, int fails) {
+        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+        GridCacheQueryMetricsAdapter metr = (GridCacheQueryMetricsAdapter)cache.queryMetrics();
+
+        assertEquals(metr.executions(), executions);
+
+        assertEquals(metr.fails(), fails);
+    }
+
+    /**
+     * @param node Node.
+     * @param cache Cache.
+     */
+    private TestKey generateNodeKeys(IgniteEx node, IgniteCache<TestKey, Object> cache)
{
+
+        ClusterNode locNode = node.localNode();
+
+        for (int ind = 0; ind < 100_000; ind++) {
+            TestKey key = new TestKey("key" + ind);
+
+            if (affinity(cache).isPrimary(locNode, key))
+                return key;
+        }
+
+        throw new IgniteException("Unable to find key keys as primary for cache.");
+    }
+
+    /**
+     *
+     * */
+    private static class TestKey implements Externalizable {
+        /**
+         * Field.
+         */
+        @QuerySqlField(index = true)
+        private String field;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public TestKey() {
+        }
+
+        /**
+         * @param field Test key 1.
+         */
+        public TestKey(String field) {
+            this.field = field;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey key = (IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey)o;
+
+            return !(field != null ? !field.equals(key.field) : key.field != null);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int hashCode() {
+            return field != null ? field.hashCode() : 0;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(field);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            field = (String)in.readObject();
+
+            if (((IgniteThread)Thread.currentThread()).getGridName().equals(failNode))
+                throw new IOException("Class can not be unmarshalled.");
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
index dfc96dc..b45d134 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java
@@ -19,10 +19,7 @@ package org.apache.ignite.testsuites;
 
 import java.util.Set;
 import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingNearErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingRebalanceErrorTest;
-import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingTxErrorTest;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -49,6 +46,7 @@ public class IgniteCacheP2pUnmarshallingErrorTestSuite extends TestSuite
{
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingNearErrorTest.class,
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingRebalanceErrorTest.class,
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingTxErrorTest.class,
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.class,
ignoredTests);
 
         return suite;
     }


Mime
View raw message