Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 77B5D18CFE for ; Wed, 19 Aug 2015 07:40:27 +0000 (UTC) Received: (qmail 70135 invoked by uid 500); 19 Aug 2015 07:40:27 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 70104 invoked by uid 500); 19 Aug 2015 07:40:27 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 70094 invoked by uid 99); 19 Aug 2015 07:40:27 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 07:40:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E529AC0BCF for ; Wed, 19 Aug 2015 07:40:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.795 X-Spam-Level: X-Spam-Status: No, score=0.795 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 3cWTfbC3uUsG for ; Wed, 19 Aug 2015 07:40:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id ABF8724F45 for ; Wed, 19 Aug 2015 07:39:56 +0000 (UTC) Received: (qmail 67578 invoked by uid 99); 19 Aug 2015 07:39:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 07:39:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 68A6DE35D7; Wed, 19 Aug 2015 07:39:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 19 Aug 2015 07:40:37 -0000 Message-Id: <10b8e325c3e4444f828a15e2d6876e4c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/50] incubator-ignite git commit: Squashed commit for ignite-1239 Squashed commit for ignite-1239 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/45c813af Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/45c813af Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/45c813af Branch: refs/heads/ignite-gg-9615-1 Commit: 45c813af7eb4a11ec59d3477a6a0b68791f1d7f2 Parents: 7635e58 Author: Denis Magda Authored: Mon Aug 17 16:51:41 2015 +0300 Committer: Yakov Zhdanov Committed: Mon Aug 17 16:51:41 2015 +0300 ---------------------------------------------------------------------- .../GridDhtUnreservedPartitionException.java | 66 ++++++++++++++ .../cache/query/GridCacheQueryAdapter.java | 56 ++++++++++-- .../cache/query/GridCacheQueryManager.java | 71 ++++++++++----- ...CacheScanPartitionQueryFallbackSelfTest.java | 96 ++++++++++++++++++++ 4 files changed, 261 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java new file mode 100644 index 0000000..d824a47 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; + +/** + * Exception that is thrown when a partition reservation failed. + */ +public class GridDhtUnreservedPartitionException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** Partition. */ + private final int part; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** + * @param part Partition. + * @param topVer Affinity topology version. + * @param msg Message. + */ + public GridDhtUnreservedPartitionException(int part, AffinityTopologyVersion topVer, String msg) { + super(msg); + + this.part = part; + this.topVer = topVer; + } + + /** + * @return Partition. + */ + public int partition() { + return part; + } + + /** + * @return Affinity topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass() + " [part=" + part + ", msg=" + getMessage() + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 953cb9a..90f9b9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; @@ -457,7 +458,7 @@ public class GridCacheQueryAdapter implements CacheQuery { return (CacheQueryFuture)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); else if (type == SCAN && part != null && nodes.size() > 1) - return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); + return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx); else return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -554,7 +555,13 @@ public class GridCacheQueryAdapter implements CacheQuery { private volatile GridCacheQueryFutureAdapter fut; /** Backups. */ - private final Queue nodes; + private volatile Queue nodes; + + /** Topology version of the last detected {@link GridDhtUnreservedPartitionException}. */ + private volatile AffinityTopologyVersion unreservedTopVer; + + /** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */ + private volatile int unreservedNodesRetryCnt = 5; /** Bean. */ private final GridCacheQueryBean bean; @@ -562,16 +569,26 @@ public class GridCacheQueryAdapter implements CacheQuery { /** Query manager. */ private final GridCacheQueryManager qryMgr; + /** Cache context. */ + private final GridCacheContext cctx; + + /** Partition. */ + private final int part; + /** * @param nodes Backups. + * @param part Partition. * @param bean Bean. * @param qryMgr Query manager. + * @param cctx Cache context. */ - public CacheQueryFallbackFuture(Collection nodes, GridCacheQueryBean bean, - GridCacheQueryManager qryMgr) { + public CacheQueryFallbackFuture(Collection nodes, int part, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr, GridCacheContext cctx) { this.nodes = fallbacks(nodes); this.bean = bean; this.qryMgr = qryMgr; + this.cctx = cctx; + this.part = part; init(); } @@ -598,7 +615,7 @@ public class GridCacheQueryAdapter implements CacheQuery { */ @SuppressWarnings("unchecked") private void init() { - ClusterNode node = nodes.poll(); + final ClusterNode node = nodes.poll(); GridCacheQueryFutureAdapter fut0 = (GridCacheQueryFutureAdapter)(node.isLocal() ? qryMgr.queryLocal(bean) : @@ -613,8 +630,33 @@ public class GridCacheQueryAdapter implements CacheQuery { onDone(e); } catch (IgniteCheckedException e) { - if (F.isEmpty(nodes)) - onDone(e); + if (e.hasCause(GridDhtUnreservedPartitionException.class)) { + unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion(); + + assert unreservedTopVer != null; + } + + if (F.isEmpty(nodes)) { + final AffinityTopologyVersion topVer = unreservedTopVer; + + if (topVer != null && --unreservedNodesRetryCnt > 0) { + cctx.affinity().affinityReadyFuture(topVer).listen( + new IgniteInClosure>() { + @Override public void apply( + IgniteInternalFuture future) { + + nodes = fallbacks(cctx.topology().owners(part, topVer)); + + // Race is impossible here because query retries are executed one by one. + unreservedTopVer = null; + + init(); + } + }); + } + else + onDone(e); + } else init(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 5d3f6a3..bfe5ecc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -39,6 +39,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; + import org.jetbrains.annotations.*; import org.jsr166.*; @@ -170,7 +171,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** - * Leaves busy state. + * Leaves busy state. */ private void leaveBusy() { busyLock.leaveBusy(); @@ -794,7 +795,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte // double check for owning state if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) - throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); + throw new GridDhtUnreservedPartitionException(part, + cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); iter = new Iterator() { private Iterator iter0 = locPart.keySet().iterator(); @@ -1083,6 +1085,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte boolean rmvRes = true; + FieldsResult res = null; + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); @@ -1109,7 +1113,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); - FieldsResult res = qryInfo.local() ? + res = qryInfo.local() ? executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())) : fieldsQueryResult(qryInfo, taskName); @@ -1232,7 +1236,19 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte throw (Error)e; } finally { - if (rmvRes) + if (qryInfo.local()) { + // Don't we need to always remove local iterators? + if (rmvRes && res != null) { + try { + res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" + + cctx.nodeId() + "]", e); + } + } + } + else if (rmvRes) removeFieldsQueryResult(qryInfo.senderId(), qryInfo.requestId()); } } @@ -1260,6 +1276,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte try { boolean loc = qryInfo.local(); + QueryResult res = null; + if (log.isDebugEnabled()) log.debug("Running query: " + qryInfo); @@ -1286,8 +1304,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte IgniteSpiCloseableIterator> iter; GridCacheQueryType type; - QueryResult res; - res = loc ? executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())) : @@ -1350,7 +1366,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte log.debug("Record [key=" + key + ", val=" + val + ", incBackups=" + incBackups + - ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) + + ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) + ", node=" + U.id8(cctx.localNode().id()) + ']'); } @@ -1496,7 +1512,19 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte throw (Error)e; } finally { - if (rmvIter) + if (loc) { + // Local iterators are always removed. + if (res != null) { + try { + res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" + + cctx.nodeId() + "]", e); + } + } + } + else if (rmvIter) removeQueryResult(qryInfo.senderId(), qryInfo.requestId()); } } @@ -1552,7 +1580,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @return Iterator. * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", + @SuppressWarnings({ + "SynchronizationOnLocalVariableOrMethodParameter", "NonPrivateFieldAccessedInSynchronizedContext"}) private QueryResult queryResult(Map>> futs, GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { @@ -1680,7 +1709,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @return Fields query result. * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", + @SuppressWarnings({ + "SynchronizationOnLocalVariableOrMethodParameter", "NonPrivateFieldAccessedInSynchronizedContext"}) private FieldsResult fieldsQueryResult(Map> resMap, GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { @@ -1868,8 +1898,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** * @param Key type. * @param Value type. - * @return Predicate. * @param includeBackups Include backups. + * @return Predicate. */ @SuppressWarnings("unchecked") @Nullable public IndexingQueryFilter backupsFilter(boolean includeBackups) { @@ -1933,7 +1963,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public Collection call() { - final GridKernalContext ctx = ((IgniteKernal) ignite).context(); + final GridKernalContext ctx = ((IgniteKernal)ignite).context(); Collection cacheNames = F.viewReadOnly(ctx.cache().caches(), new C1, String>() { @@ -2507,7 +2537,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte if (!filter.apply(key, val)) return null; - return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())) ; + return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())); } } @@ -2546,7 +2576,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte idx++; - while(idx < iters.size()) { + while (idx < iters.size()) { iter = iters.get(idx); if (iter.hasNextX()) @@ -2598,7 +2628,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public T unwrap(Class clazz) { - if(clazz.isAssignableFrom(getClass())) + if (clazz.isAssignableFrom(getClass())) return clazz.cast(this); throw new IllegalArgumentException(); @@ -2627,7 +2657,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte assert res; } - /** * Close if this result does not have any other recipients. * @@ -2958,8 +2987,8 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** - * Creates user's full text query, queried class, and query clause. - * For more information refer to {@link CacheQuery} documentation. + * Creates user's full text query, queried class, and query clause. For more information refer to {@link CacheQuery} + * documentation. * * @param clsName Query class name. * @param search Search clause. @@ -2982,14 +3011,14 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte } /** - * Creates user's SQL fields query for given clause. For more information refer to - * {@link CacheQuery} documentation. + * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery} + * documentation. * * @param qry Query. * @param keepPortable Keep portable flag. * @return Created query. */ - public CacheQuery> createSqlFieldsQuery(String qry, boolean keepPortable) { + public CacheQuery> createSqlFieldsQuery(String qry, boolean keepPortable) { A.notNull(qry, "qry"); return new GridCacheQueryAdapter<>(cctx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/45c813af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index 84ceafd..f422e9c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -19,9 +19,11 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -157,6 +159,90 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } /** + * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with + * scan query. + * + * @throws Exception In case of error. + */ + public void testScanFallbackOnRebalancing() throws Exception { + cacheMode = CacheMode.PARTITIONED; + clientMode = false; + backups = 1; + commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory(); + + try { + Ignite ignite = startGrids(GRID_CNT); + + final IgniteCacheProxy cache = fillCache(ignite); + + final AtomicBoolean done = new AtomicBoolean(false); + + final AtomicInteger idx = new AtomicInteger(GRID_CNT); + + IgniteInternalFuture fut1 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + int id = idx.getAndIncrement(); + + while (!done.get()) { + startGrid(id); + Thread.sleep(3000); + + stopGrid(id); + + if (done.get()) + return null; + + Thread.sleep(3000); + } + + return null; + } + }, GRID_CNT); + + final AtomicInteger nodeIdx = new AtomicInteger(); + + IgniteInternalFuture fut2 = multithreadedAsync( + new Callable() { + @Override public Object call() throws Exception { + int nodeId = nodeIdx.getAndIncrement(); + + IgniteCacheProxy cache = (IgniteCacheProxy) + grid(nodeId).cache(null); + + while (!done.get()) { + IgniteBiTuple tup = remotePartition(cache.context()); + + int part = tup.get1(); + + try { + CacheQuery> qry = cache.context().queries().createScanQuery( + null, part, false); + + doTestScanQuery(qry); + } + catch (ClusterGroupEmptyCheckedException e) { + log.warning("Invalid partition: " + part, e); + } + } + + return null; + } + }, GRID_CNT); + + Thread.sleep(60 * 1000); // Test for one minute + + done.set(true); + + fut2.get(); + fut1.get(); + } + finally { + stopAllGrids(); + } + } + + /** * Scan should try first remote node and fallbacks to second remote node. * * @throws Exception If failed. @@ -408,4 +494,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT }; } } + + /** + * + */ + private static class TestFallbackOnRebalancingCommunicationSpiFactory implements CommunicationSpiFactory { + /** {@inheritDoc} */ + @Override public TcpCommunicationSpi create() { + return new TcpCommunicationSpi(); + } + } }