Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E9255200C3E for ; Mon, 13 Feb 2017 12:29:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E7CA7160B60; Mon, 13 Feb 2017 11:29:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1C8E2160B89 for ; Mon, 13 Feb 2017 12:29:42 +0100 (CET) Received: (qmail 77031 invoked by uid 500); 13 Feb 2017 11:29:42 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 75744 invoked by uid 99); 13 Feb 2017 11:29:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Feb 2017 11:29:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9764CDFE93; Mon, 13 Feb 2017 11:29:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 13 Feb 2017 11:30:10 -0000 Message-Id: <9e8c8dfb80894a9fbbf95eed33d211e1@git.apache.org> In-Reply-To: <386653f669a24bbd84e5e61fe97baced@git.apache.org> References: <386653f669a24bbd84e5e61fe97baced@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/40] ignite git commit: Merge branch 'ignite-1.7.6' archived-at: Mon, 13 Feb 2017 11:29:46 -0000 Merge branch 'ignite-1.7.6' Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aaeda721 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aaeda721 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aaeda721 Branch: refs/heads/ignite-2.0 Commit: aaeda7214f738dff2fbd865e83250413a9b7cc0f Parents: e1c3dda f350578 Author: agura Authored: Thu Feb 9 20:29:48 2017 +0300 Committer: agura Committed: Thu Feb 9 20:29:48 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../org/apache/ignite/cache/QueryEntity.java | 21 + .../org/apache/ignite/cache/query/SqlQuery.java | 25 + .../processors/cache/EntryGetResult.java | 65 +++ .../processors/cache/GridCacheAdapter.java | 125 +++-- .../processors/cache/GridCacheContext.java | 4 +- .../processors/cache/GridCacheEntryEx.java | 42 +- .../processors/cache/GridCacheMapEntry.java | 140 +++++- .../processors/cache/GridCacheUtils.java | 3 + .../processors/cache/IgniteCacheProxy.java | 3 + .../processors/cache/ReaderArguments.java | 74 +++ .../distributed/dht/GridDhtCacheAdapter.java | 9 +- .../cache/distributed/dht/GridDhtGetFuture.java | 83 ++-- .../distributed/dht/GridDhtGetSingleFuture.java | 75 ++- .../dht/GridPartitionedGetFuture.java | 10 +- .../dht/GridPartitionedSingleGetFuture.java | 10 +- .../dht/atomic/GridDhtAtomicCache.java | 12 +- .../dht/colocated/GridDhtColocatedCache.java | 10 +- .../distributed/near/GridNearGetFuture.java | 19 +- .../local/atomic/GridLocalAtomicCache.java | 11 +- .../cache/query/GridCacheQueryManager.java | 83 +++- .../continuous/CacheContinuousQueryHandler.java | 81 +++- .../cache/transactions/IgniteTxHandler.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 65 +-- .../processors/query/GridQueryIndexing.java | 4 +- .../processors/query/GridQueryProcessor.java | 79 ++-- .../query/GridQueryTypeDescriptor.java | 7 + .../communication/tcp/TcpCommunicationSpi.java | 16 + .../ignite/spi/discovery/tcp/ClientImpl.java | 90 +++- .../ignite/spi/discovery/tcp/ServerImpl.java | 63 ++- .../messages/TcpDiscoveryAbstractMessage.java | 21 + modules/core/src/test/config/log4j-test.xml | 6 + .../cache/CacheConcurrentReadThroughTest.java | 184 ++++++++ .../processors/cache/GridCacheTestEntryEx.java | 30 +- .../near/GridNearCacheStoreUpdateTest.java | 466 +++++++++++++++++++ .../GridNearOffheapCacheStoreUpdateTest.java | 35 ++ .../cache/query/IndexingSpiQuerySelfTest.java | 69 ++- .../IndexingSpiQueryWithH2IndexingSelfTest.java | 36 ++ .../tcp/TcpCommunicationSpiDropNodesTest.java | 322 +++++++++++++ .../TcpCommunicationSpiFaultyClientTest.java | 265 +++++++++++ .../ignite/testframework/GridTestNode.java | 1 + .../testframework/junits/GridAbstractTest.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 7 + .../IgniteSpiCommunicationSelfTestSuite.java | 5 + .../IgniteSpiDiscoverySelfTestSuite.java | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 30 +- ...CacheScanPartitionQueryFallbackSelfTest.java | 2 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 294 ++++++++++++ .../IgniteCachePartitionedQuerySelfTest.java | 85 ++++ .../h2/GridIndexingSpiAbstractSelfTest.java | 29 +- 50 files changed, 2719 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8d0a962,59665bb..264fa14 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -1995,12 -2023,11 +2016,12 @@@ public abstract class GridCacheAdapter< GridCacheEntryEx entry = entryEx(key); try { - GridCacheVersion verSet = entry.versionedValue(cacheVal, - ver, + T2 verVal = entry.versionedValue( + cacheVal, + res.version(), null, - expiry); - - boolean set = verSet != null; ++ expiry, + readerArgs); if (log.isDebugEnabled()) log.debug("Set value loaded from store into entry [" + http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 9e9b496,51f423a..f26288f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@@ -725,15 -755,15 +755,17 @@@ public interface GridCacheEntryEx * @param val New value. * @param curVer Version to match or {@code null} if match is not required. * @param newVer Version to set. + * @param loadExpiryPlc Expiry policy if entry is loaded from store. - * @return Non null version if value was set. + * @param readerArgs Reader will be added if not null. + * @return Current version and value. * @throws IgniteCheckedException If index could not be updated. * @throws GridCacheEntryRemovedException If entry was removed. */ - public GridCacheVersion versionedValue(CacheObject val, + public T2 versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, @Nullable GridCacheVersion newVer, - @Nullable IgniteCacheExpiryPolicy loadExpiryPlc) ++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 52b779d,59e4181..942ae21 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -3549,14 -3609,28 +3609,29 @@@ public abstract class GridCacheMapEntr } /** {@inheritDoc} */ - @Override public synchronized GridCacheVersion versionedValue(CacheObject val, + @Override public synchronized void clearReserveForLoad(GridCacheVersion ver) throws IgniteCheckedException { + if (obsoleteVersionExtras() != null) + return; + + if (ver.equals(this.ver)) { + assert evictionDisabled() : this; + + flags &= ~IS_EVICT_DISABLED; + } + } + + /** {@inheritDoc} */ + @Override public synchronized T2 versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer, - @Nullable IgniteCacheExpiryPolicy loadExpiryPlc) ++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc, + @Nullable ReaderArguments readerArgs) - throws IgniteCheckedException, GridCacheEntryRemovedException - { + throws IgniteCheckedException, GridCacheEntryRemovedException { + checkObsolete(); + addReaderIfNeed(readerArgs); + if (curVer == null || curVer.equals(ver)) { if (val != this.val) { GridCacheMvcc mvcc = mvccExtras(); http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e657f32,f601e0a..1b6179e --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -2381,7 -2293,7 +2383,7 @@@ public class GridDhtAtomicCache e try { GridCacheVersion ver = entry.version(); -- entry.versionedValue(ctx.toCacheObject(v), null, ver, null); ++ entry.versionedValue(ctx.toCacheObject(v), null, ver, null, null); } catch (GridCacheEntryRemovedException e) { assert false : "Entry should not get obsolete while holding lock [entry=" + entry + http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 63b0717,83edab4..a9a7d7c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@@ -1253,8 -1312,8 +1312,8 @@@ public class CacheContinuousQueryHandle try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException ignored) { - IgniteLogger log = ctx.log(getClass()); + IgniteLogger log = ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY); if (log.isDebugEnabled()) log.debug("Failed to send acknowledge message, node left " + http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7ceb701,f05d90d..cd4c55c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -439,8 -437,9 +440,9 @@@ public abstract class IgniteTxLocalAdap CU.subjectId(this, cctx), null, resolveTaskName(), - expiryPlc, + expiryPlc0, - txEntry == null ? keepBinary : txEntry.keepBinary()); + txEntry == null ? keepBinary : txEntry.keepBinary(), + null); if (res == null) { if (misses == null) @@@ -476,20 -475,22 +478,23 @@@ CacheObject cacheVal = cacheCtx.toCacheObject(val); while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer); try { - GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null, null); - - boolean set = setVer != null; + T2 verVal = entry.versionedValue(cacheVal, + ver, + null, ++ null, + null); - if (set) - ver = setVer; + if (log.isDebugEnabled()) { + log.debug("Set value loaded from store into entry [" + + "oldVer=" + ver + + ", newVer=" + verVal.get2() + + ", entry=" + entry + ']'); + } - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry [set=" + set + - ", curVer=" + ver + ", newVer=" + setVer + ", " + - "entry=" + entry + ']'); + ver = verVal.get2(); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 9fdadf3,8db68b4..e76ab40 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@@ -664,10 -686,10 +685,11 @@@ public class GridCacheTestEntryEx exten } /** @inheritDoc */ - @Override public GridCacheVersion versionedValue(CacheObject val, + @Override public T2 versionedValue(CacheObject val, GridCacheVersion curVer, GridCacheVersion newVer, - IgniteCacheExpiryPolicy loadExpiryPlc) { ++ @Nullable IgniteCacheExpiryPolicy loadExpiryPlc, + @Nullable ReaderArguments readerArgs) { assert false; return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java index 0000000,d29231e..b530e36 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java @@@ -1,0 -1,322 +1,322 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.communication.tcp; + + import java.util.Collections; + import java.util.HashMap; + import java.util.Map; + import java.util.concurrent.Callable; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.CyclicBarrier; + import java.util.concurrent.TimeUnit; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.cluster.ClusterNode; + import org.apache.ignite.configuration.IgniteConfiguration; + import org.apache.ignite.events.Event; + import org.apache.ignite.internal.IgniteEx; + import org.apache.ignite.internal.IgniteInternalFuture; + import org.apache.ignite.internal.util.lang.GridAbsPredicate; + import org.apache.ignite.internal.util.nio.GridCommunicationClient; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgniteBiPredicate; + import org.apache.ignite.lang.IgnitePredicate; + import org.apache.ignite.lang.IgniteRunnable; + import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; + import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; + import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + import org.apache.ignite.testframework.GridTestUtils; + import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + + /** + * + */ + public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Nodes count. */ + private static final int NODES_CNT = 4; + + /** Block. */ + private static volatile boolean block; + + /** Predicate. */ + private static IgniteBiPredicate pred; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClockSyncFrequency(300000); + cfg.setFailureDetectionTimeout(1000); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setIdleConnectionTimeout(100); + spi.setSharedMemoryPort(-1); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setCommunicationSpi(spi); + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + block = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testOneNode() throws Exception { + pred = new IgniteBiPredicate() { + @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) { + return block && rmtNode.order() == 3; + } + }; + + startGrids(NODES_CNT); + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + block = true; + + grid(0).compute().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + assertTrue(latch.await(15, TimeUnit.SECONDS)); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(3).cluster().topologyVersion() == NODES_CNT + 1; + } + }, 5000)); + + for (int i = 0; i < 10; i++) { + U.sleep(1000); + + assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size()); + + int liveNodesCnt = 0; + + for (int j = 0; j < NODES_CNT; j++) { + IgniteEx ignite; + + try { + ignite = grid(j); + + log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes()); + + ClusterNode locNode = ignite.localNode(); + + if (locNode.order() != 3) { + assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size()); + + for (ClusterNode node : ignite.cluster().nodes()) + assertTrue(node.order() != 3); + + liveNodesCnt++; + } + } + catch (Exception e) { + log.info("Checking topology for grid(" + j + "): no grid in topology."); + } + } + + assertEquals(NODES_CNT - 1, liveNodesCnt); + } + } + + /** + * @throws Exception If failed. + */ + public void testTwoNodesEachOther() throws Exception { + pred = new IgniteBiPredicate() { + @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) { + return block && (locNode.order() == 2 || locNode.order() == 4) && + (rmtNode.order() == 2 || rmtNode.order() == 4); + } + }; + + startGrids(NODES_CNT); + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + block = true; + + final CyclicBarrier barrier = new CyclicBarrier(2); + + IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + barrier.await(); + + grid(1).compute().withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + barrier.await(); + + grid(3).compute().withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + + return null; + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(2).cluster().nodes().size() == NODES_CNT - 1; + } + }, 5000); + + try { + fut1.get(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + try { + fut2.get(); + } + catch (IgniteCheckedException e) { + // No-op. + } + + long failedNodeOrder = 1 + 2 + 3 + 4; + + for (ClusterNode node : grid(0).cluster().nodes()) + failedNodeOrder -= node.order(); + + for (int i = 0; i < 10; i++) { + U.sleep(1000); + + assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size()); + + int liveNodesCnt = 0; + + for (int j = 0; j < NODES_CNT; j++) { + IgniteEx ignite; + + try { + ignite = grid(j); + + log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes()); + + ClusterNode locNode = ignite.localNode(); + + if (locNode.order() != failedNodeOrder) { + assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size()); + + for (ClusterNode node : ignite.cluster().nodes()) + assertTrue(node.order() != failedNodeOrder); + + liveNodesCnt++; + } + } + catch (Exception e) { + log.info("Checking topology for grid(" + j + "): no grid in topology."); + } + } + + assertEquals(NODES_CNT - 1, liveNodesCnt); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { ++ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { + if (pred.apply(getLocalNode(), node)) { + Map attrs = new HashMap<>(node.attributes()); + + attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1")); + attrs.put(createAttributeName(ATTR_PORT), 47200); + attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList()); + attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList()); + + ((TcpDiscoveryNode)node).setAttributes(attrs); + } + - return super.createTcpClient(node); ++ return super.createTcpClient(node, connIdx); + } + + /** + * @param name Name. + */ + private String createAttributeName(String name) { + return getClass().getSimpleName() + '.' + name; + } + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java index 0000000,6e99487..c21e6ce mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java @@@ -1,0 -1,270 +1,265 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.spi.communication.tcp; + + import java.io.IOException; -import java.io.OutputStream; + import java.net.InetAddress; + import java.net.ServerSocket; -import java.net.Socket; + import java.util.Collections; + import java.util.HashMap; + import java.util.Map; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.TimeUnit; -import org.apache.ignite.Ignite; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.IgniteException; + import org.apache.ignite.cluster.ClusterNode; + import org.apache.ignite.configuration.IgniteConfiguration; + import org.apache.ignite.events.Event; + import org.apache.ignite.internal.IgniteInternalFuture; + import org.apache.ignite.internal.IgniteInterruptedCheckedException; + import org.apache.ignite.internal.util.lang.GridAbsPredicate; + import org.apache.ignite.internal.util.nio.GridCommunicationClient; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgnitePredicate; + import org.apache.ignite.lang.IgniteRunnable; + import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; + import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; + import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; + import org.apache.ignite.testframework.GridTestUtils; + import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + + /** + * Tests that faulty client will be failed if connection can't be established. + */ + public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Predicate. */ + private static final IgnitePredicate PRED = new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return block && node.order() == 3; + } + }; + + /** Client mode. */ + private static boolean clientMode; + + /** Block. */ + private static volatile boolean block; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClockSyncFrequency(300000); + cfg.setFailureDetectionTimeout(1000); + cfg.setClientMode(clientMode); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setIdleConnectionTimeout(100); + spi.setSharedMemoryPort(-1); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + discoSpi.setClientReconnectDisabled(true); + + cfg.setCommunicationSpi(spi); + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + block = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testNoServerOnHost() throws Exception { + testFailClient(null); + } + + /** + * @throws Exception If failed. + */ + public void testNotAcceptedConnection() throws Exception { + testFailClient(new FakeServer()); + } + + /** + * @param srv Server. + * @throws Exception If failed. + */ + private void testFailClient(FakeServer srv) throws Exception { + IgniteInternalFuture fut = null; + + try { + if (srv != null) + fut = GridTestUtils.runMultiThreadedAsync(srv, 1, "fake-server"); + + clientMode = false; + + startGrids(2); + + clientMode = true; + + startGrid(2); + startGrid(3); + + U.sleep(1000); // Wait for write timeout and closing idle connections. + + final CountDownLatch latch = new CountDownLatch(1); + + grid(0).events().localListen(new IgnitePredicate() { + @Override + public boolean apply(Event event) { + latch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + + block = true; + + try { + grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new IgniteRunnable() { + @Override public void run() { + // No-op. + } + }); + } + catch (IgniteException e) { + // No-op. + } + + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return grid(0).cluster().forClients().nodes().size() == 1; + } + }, 5000)); + + for (int i = 0; i < 5; i++) { + U.sleep(1000); + + log.info("Check topology (" + (i + 1) + "): " + grid(0).cluster().nodes()); + + assertEquals(1, grid(0).cluster().forClients().nodes().size()); + } + } + finally { + if (srv != null) { + srv.stop(); + + assert fut != null; + + fut.get(); + } + + stopAllGrids(); + } + } + + /** + * Server that emulates connection troubles. + */ + private static class FakeServer implements Runnable { + /** Server. */ + private final ServerSocket srv; + + /** Stop. */ + private volatile boolean stop; + + /** + * Default constructor. + */ + FakeServer() throws IOException { + this.srv = new ServerSocket(47200, 50, InetAddress.getByName("127.0.0.1")); + } + + /** + * + */ + public void stop() { + stop = true; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + while (!stop) { + try { + U.sleep(10); + } + catch (IgniteInterruptedCheckedException e) { + // No-op. + } + } + } + finally { + U.closeQuiet(srv); + } + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { ++ @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { + if (PRED.apply(node)) { + Map attrs = new HashMap<>(node.attributes()); + + attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1")); + attrs.put(createAttributeName(ATTR_PORT), 47200); + attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList()); + attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList()); + + ((TcpDiscoveryNode)node).setAttributes(attrs); + } + - return super.createTcpClient(node); ++ return super.createTcpClient(node, connIdx); + } + + /** + * @param name Name. + */ + private String createAttributeName(String name) { + return getClass().getSimpleName() + '.' + name; + } + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 69a65fe,cbf2ebd..9416621 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@@ -1077,10 -1051,9 +1077,10 @@@ public class IgniteH2Indexing implement final TableDescriptor tbl = tableDescriptor(spaceName, type); if (tbl == null) - throw new CacheException("Failed to find SQL table for type: " + type.name()); + throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(), + IgniteQueryErrorCode.TABLE_NOT_FOUND); - String sql = generateQuery(qry, tbl); + String sql = generateQuery(qry, alias, tbl); Connection conn = connectionForThread(tbl.schemaName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaeda721/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ----------------------------------------------------------------------