Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 62308200BF0 for ; Thu, 24 Nov 2016 14:24:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 60C0B160B30; Thu, 24 Nov 2016 13:24:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8875B160B1E for ; Thu, 24 Nov 2016 14:24:12 +0100 (CET) Received: (qmail 16740 invoked by uid 500); 24 Nov 2016 13:24:11 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 16684 invoked by uid 99); 24 Nov 2016 13:24:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Nov 2016 13:24:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9533BE055D; Thu, 24 Nov 2016 13:24:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Date: Thu, 24 Nov 2016 13:24:12 -0000 Message-Id: In-Reply-To: <60e3a79c717a4fc68f717e1a06f724c3@git.apache.org> References: <60e3a79c717a4fc68f717e1a06f724c3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/50] [abbrv] ignite git commit: IGNITE-2079 GridCacheIoManager eats exception trail if it falls into the directed case merger from ignite-2079-2 archived-at: Thu, 24 Nov 2016 13:24:14 -0000 IGNITE-2079 GridCacheIoManager eats exception trail if it falls into the directed case merger from ignite-2079-2 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9ddb8be1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9ddb8be1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9ddb8be1 Branch: refs/heads/ignite-4242 Commit: 9ddb8be1243df8e489f7ebc716d315415775439a Parents: 4474046 Author: Dmitriy Govorukhin Authored: Thu Oct 27 17:52:22 2016 +0300 Committer: Dmitriy Govorukhin Committed: Thu Oct 27 17:52:22 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/events/EventType.java | 6 + .../ignite/events/UnhandledExceptionEvent.java | 61 ++++ .../processors/cache/GridCacheIoManager.java | 70 +++-- .../cache/query/GridCacheQueryManager.java | 10 + .../query/GridCacheQueryMetricsAdapter.java | 9 +- .../cache/query/GridCacheQueryResponse.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 50 +-- ...2pUnmarshallingContinuousQueryErrorTest.java | 302 +++++++++++++++++++ ...niteCacheP2pUnmarshallingErrorTestSuite.java | 6 +- 9 files changed, 455 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 103dbd4..7778f67 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -859,6 +859,12 @@ public interface EventType { public static final int EVT_IGFS_FILE_PURGED = 127; /** + * Built-in event type: event for unhandled exception. + * + */ + public static final int EVT_UNHANDLED_EXCEPTION = 128; + + /** * All checkpoint events. This array can be directly passed into * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to * subscribe to all checkpoint events. http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java new file mode 100644 index 0000000..cb6cd85 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/UnhandledExceptionEvent.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import org.apache.ignite.cluster.ClusterNode; + +/** + * Cache fail event. + */ +public class UnhandledExceptionEvent extends EventAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Exception ex; + + /** + * Default constructor. + */ + public UnhandledExceptionEvent() { + } + + /** + * @param node Node. + * @param msg Message. + * @param ex Exception. + * @param type Type. + */ + public UnhandledExceptionEvent(ClusterNode node, String msg, Exception ex, int type) { + super(node, msg, type); + this.ex = ex; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "msg=" + message() + ", type=" + type() + "ex=" + ex; + } + + /** + * + * @return inner exception + */ + public Exception getException() { + return ex; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 78dddd3..5d7cb00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -17,50 +17,26 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.UnhandledExceptionEvent; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -79,6 +55,12 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.ignite.events.EventType.EVT_UNHANDLED_EXCEPTION; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; /** @@ -693,6 +675,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 59: + // No additional actions required, just skipping default switch section, + // since UnhandledException already registered. + break; + case 114: { processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander. } @@ -737,13 +724,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; - default: - throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" - + msg + "]", msg.classError()); + default:{ + String shortMsg = "Failed to send response to node. Unsupported direct type [message=" + msg + "]"; + + IgniteCheckedException e = new IgniteCheckedException(shortMsg, msg.classError()); + + registerUnhandledException(ctx, shortMsg, e); + } } } /** + * @param ctx Grid cache context. + * @param shortMsg Short message. + * @param ex Original Exception. + */ + public static void registerUnhandledException(GridCacheContext ctx, String shortMsg, IgniteCheckedException ex) { + GridKernalContext kctx = ctx.kernalContext(); + + kctx.exceptionRegistry().onException(shortMsg, ex); + + ClusterNode node = ctx.discovery().localNode(); + + UnhandledExceptionEvent evt = new UnhandledExceptionEvent(node, shortMsg, ex, EVT_UNHANDLED_EXCEPTION); + + kctx.event().record(evt); + } + + /** * @param nodeId Node ID. * @param msg Message. * @param c Closure. http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 7bd1a51..97e59c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -326,6 +326,16 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** + * Increment fails counter. + */ + public void onUnhandledException() { + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + if (statsEnabled) + metrics.incrementOnFails(); + } + + /** * Processes cache query request. * * @param sndId Sender node id. http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java index e70ea9f..d25b7c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMetricsAdapter.java @@ -172,4 +172,11 @@ public class GridCacheQueryMetricsAdapter implements QueryMetrics, Externalizabl @Override public String toString() { return S.toString(GridCacheQueryMetricsAdapter.class, this); } -} + + /** + * Increment fails counter. + */ + public void incrementOnFails() { + fails.increment(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index 8492c38..2b86efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -357,6 +357,6 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridCacheQueryResponse.class, this); + return S.toString(GridCacheQueryResponse.class, this, super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 304d031..4c91ea7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -17,28 +17,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryEventFilter; -import javax.cache.event.CacheEntryUpdatedListener; -import javax.cache.event.EventType; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -61,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; @@ -82,8 +61,22 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.event.EventType; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.internal.processors.cache.GridCacheIoManager.registerUnhandledException; /** * Continuous query handler. @@ -688,8 +681,17 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler catch (IgniteCheckedException ex) { if (ignoreClsNotFound) assert internal; - else - U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + else { + String shortMsg = "Failed to unmarshal entry."; + + U.error(ctx.log(getClass()), shortMsg, ex); + + GridCacheQueryManager qryMgr = cctx.queries(); + + qryMgr.onUnhandledException(); + + registerUnhandledException(cctx, shortMsg, ex); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java new file mode 100644 index 0000000..82f5f09 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.events.UnhandledExceptionEvent; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMetricsAdapter; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.thread.IgniteThread; + +/** + * Checks behavior on exception while unmarshalling key for continuous query. + */ +public class IgniteCacheP2pUnmarshallingContinuousQueryErrorTest extends IgniteCacheP2pUnmarshallingErrorTest { + /** + * {@inheritDoc} + */ + @Override protected int gridCount() { + return 3; + } + + /** Used inside InitialQuery listener. */ + private static final CountDownLatch latch = new CountDownLatch(1); + + /** Node where unmarshalling fails with exceptions. */ + private static volatile String failNode; + + /** Used to count UnhandledExceptionEvents at client node. */ + private static final AtomicInteger cnt = new AtomicInteger(); + + /** + * {@inheritDoc} + */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cacheCfg = super.cacheConfiguration(gridName); + + cacheCfg.setStatisticsEnabled(true); + + return cacheCfg; + } + + /** + * {@inheritDoc} + */ + @Override public void testResponseMessageOnUnmarshallingFailed() throws Exception { + IgniteEx client = grid(0); + IgniteEx node1 = grid(1); + IgniteEx node2 = grid(2); + + assert client.configuration().isClientMode() && + !node1.configuration().isClientMode() && + !node2.configuration().isClientMode(); + + failNode = client.name(); + + client.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + UnhandledExceptionEvent uex = (UnhandledExceptionEvent)evt; + + assertTrue(X.getFullStackTrace(uex.getException()). + contains("IOException: Class can not be unmarshalled")); + + cnt.incrementAndGet(); + + return true; + } + }, EventType.EVT_UNHANDLED_EXCEPTION); + + node1.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + fail("This line should newer calls."); + + return true; + } + }, EventType.EVT_UNHANDLED_EXCEPTION); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate() { + @Override public boolean apply(TestKey key, String val) { + latch.countDown(); // Gives guarantee query initialized. + + return true; + } + })); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) { + fail("This line should newer calls."); + } + }); + + validate( + 0,//execs + 0,//evts + 0,//fails + client, + node1, + node2); + + // Put element before creating QueryCursor. + putPrimary(node1); + + try (QueryCursor> cur = client.cache(null).query(qry)) { + latch.await(); + + validate( + 1,//execs + 0,//evts + 0,//fails + client, + node1, + node2); + + putPrimary(node1); + + validate( + 1,//execs + 1,//evts + 1,//fails + client, + node1, + node2); + + putPrimary(node2); + + validate( + 1,//execs + 2,//evts + 2,//fails + client, + node1, + node2); + } + } + + /** + * @param ignite Ignite. + */ + private void putPrimary(IgniteEx ignite) { + IgniteCache cache = ignite.cache(null); + + cache.put(generateNodeKeys(ignite, cache), "value"); + } + + /** + * @param execs Executions. + * @param evts Events. + * @param failsNum Fails number. + * @param client Client. + * @param node1 Node 1. + * @param node2 Node 2. + */ + private void validate(final int execs, final int evts, final int failsNum, final IgniteEx client, IgniteEx node1, + IgniteEx node2) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return client.cache(null).queryMetrics().fails() == failsNum; + } + }, 5_000)); + + assertEquals(evts, cnt.intValue()); + + validateCacheQueryMetrics(client, execs, failsNum); + validateCacheQueryMetrics(node1, 0, 0); + validateCacheQueryMetrics(node2, 0, 0); + } + + /** + * @param ignite Ignite. + * @param executions Executions. + * @param fails Fails. + */ + private void validateCacheQueryMetrics(IgniteEx ignite, int executions, int fails) { + IgniteCache cache = ignite.cache(null); + + GridCacheQueryMetricsAdapter metr = (GridCacheQueryMetricsAdapter)cache.queryMetrics(); + + assertEquals(metr.executions(), executions); + + assertEquals(metr.fails(), fails); + } + + /** + * @param node Node. + * @param cache Cache. + */ + private TestKey generateNodeKeys(IgniteEx node, IgniteCache cache) { + + ClusterNode locNode = node.localNode(); + + for (int ind = 0; ind < 100_000; ind++) { + TestKey key = new TestKey("key" + ind); + + if (affinity(cache).isPrimary(locNode, key)) + return key; + } + + throw new IgniteException("Unable to find key keys as primary for cache."); + } + + /** + * + * */ + private static class TestKey implements Externalizable { + /** + * Field. + */ + @QuerySqlField(index = true) + private String field; + + /** + * Required by {@link Externalizable}. + */ + public TestKey() { + } + + /** + * @param field Test key 1. + */ + public TestKey(String field) { + this.field = field; + } + + /** + * {@inheritDoc} + */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey key = (IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.TestKey)o; + + return !(field != null ? !field.equals(key.field) : key.field != null); + } + + /** + * {@inheritDoc} + */ + @Override public int hashCode() { + return field != null ? field.hashCode() : 0; + } + + /** + * {@inheritDoc} + */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(field); + } + + /** + * {@inheritDoc} + */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + field = (String)in.readObject(); + + if (((IgniteThread)Thread.currentThread()).getGridName().equals(failNode)) + throw new IOException("Class can not be unmarshalled."); + + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9ddb8be1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java index dfc96dc..b45d134 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheP2pUnmarshallingErrorTestSuite.java @@ -19,10 +19,7 @@ package org.apache.ignite.testsuites; import java.util.Set; import junit.framework.TestSuite; -import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingErrorTest; -import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingNearErrorTest; -import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingRebalanceErrorTest; -import org.apache.ignite.internal.processors.cache.IgniteCacheP2pUnmarshallingTxErrorTest; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.testframework.GridTestUtils; /** @@ -49,6 +46,7 @@ public class IgniteCacheP2pUnmarshallingErrorTestSuite extends TestSuite { GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingNearErrorTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingRebalanceErrorTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingTxErrorTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteCacheP2pUnmarshallingContinuousQueryErrorTest.class, ignoredTests); return suite; }