Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED33118796 for ; Fri, 12 Jun 2015 11:17:21 +0000 (UTC) Received: (qmail 4795 invoked by uid 500); 12 Jun 2015 11:17:21 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 4722 invoked by uid 500); 12 Jun 2015 11:17:21 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 4706 invoked by uid 99); 12 Jun 2015 11:17:21 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jun 2015 11:17:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1A2CEC0DCD for ; Fri, 12 Jun 2015 11:17:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id FYTFjdlvxPzT for ; Fri, 12 Jun 2015 11:17:07 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id C989E25320 for ; Fri, 12 Jun 2015 11:16:57 +0000 (UTC) Received: (qmail 2158 invoked by uid 99); 12 Jun 2015 11:16:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jun 2015 11:16:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A289FE17FA; Fri, 12 Jun 2015 11:16:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 12 Jun 2015 11:17:01 -0000 Message-Id: <4416d892dd20458c8180e517d4a50da7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/24] incubator-ignite git commit: ignite-sprint-6: merge from ignite-sprint-5 ignite-sprint-6: merge from ignite-sprint-5 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5d007e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5d007e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5d007e3 Branch: refs/heads/ignite-gg-10411 Commit: a5d007e323af2d6619c11fb698992c8020d5eefa Parents: d4b9731 Author: Denis Magda Authored: Thu Jun 11 12:34:49 2015 +0300 Committer: Denis Magda Committed: Thu Jun 11 12:34:49 2015 +0300 ---------------------------------------------------------------------- DEVNOTES.txt | 6 + assembly/dependencies-fabric.xml | 1 + examples/pom.xml | 34 ++ modules/core/pom.xml | 1 - .../apache/ignite/IgniteSystemProperties.java | 3 + .../apache/ignite/cache/query/ScanQuery.java | 45 +- .../configuration/CacheConfiguration.java | 1 - .../affinity/GridAffinityAssignmentCache.java | 5 +- .../processors/cache/GridCacheAdapter.java | 15 +- .../GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 30 +- .../processors/cache/GridCacheSwapManager.java | 55 ++- .../processors/cache/IgniteCacheProxy.java | 11 +- .../processors/cache/QueryCursorImpl.java | 23 +- .../distributed/dht/GridDhtLocalPartition.java | 7 + .../processors/cache/query/CacheQuery.java | 2 +- .../query/GridCacheDistributedQueryManager.java | 3 + .../cache/query/GridCacheQueryAdapter.java | 147 ++++++- .../cache/query/GridCacheQueryManager.java | 209 ++++++---- .../cache/query/GridCacheQueryRequest.java | 47 ++- .../processors/cache/query/QueryCursorEx.java | 8 + .../datastructures/GridCacheSetImpl.java | 4 +- .../processors/query/GridQueryIndexing.java | 4 +- .../processors/query/GridQueryProcessor.java | 18 +- .../service/GridServiceProcessor.java | 2 +- .../ignite/internal/util/GridJavaProcess.java | 2 +- .../ignite/internal/util/IgniteUtils.java | 4 +- .../shmem/IpcSharedMemoryClientEndpoint.java | 2 +- .../ipc/shmem/IpcSharedMemoryNativeLoader.java | 151 ++++++- .../shmem/IpcSharedMemoryServerEndpoint.java | 2 +- .../util/nio/GridShmemCommunicationClient.java | 146 +++++++ .../communication/tcp/TcpCommunicationSpi.java | 415 ++++++++++++++++++- .../tcp/TcpCommunicationSpiMBean.java | 8 + .../cache/GridCacheAbstractFullApiSelfTest.java | 15 + .../cache/IgniteDynamicCacheStartSelfTest.java | 19 + .../distributed/IgniteCacheManyClientsTest.java | 169 ++++++++ .../IgniteCacheMessageRecoveryAbstractTest.java | 1 + ...achePartitionedPreloadLifecycleSelfTest.java | 2 +- ...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +- .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 +++-- .../ipc/shmem/IgfsSharedMemoryTestServer.java | 2 + .../IpcSharedMemoryCrashDetectionSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java | 2 +- .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java | 2 +- .../LoadWithCorruptedLibFileTestRunner.java | 2 +- .../IpcSharedMemoryBenchmarkReader.java | 2 +- .../IpcSharedMemoryBenchmarkWriter.java | 2 +- .../communication/GridIoManagerBenchmark0.java | 1 + .../spi/GridTcpSpiForwardingSelfTest.java | 1 + .../GridTcpCommunicationSpiAbstractTest.java | 13 + ...mmunicationSpiConcurrentConnectSelfTest.java | 4 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 21 +- ...pCommunicationSpiMultithreadedShmemTest.java | 28 ++ ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 1 + .../GridTcpCommunicationSpiShmemSelfTest.java | 38 ++ .../tcp/GridTcpCommunicationSpiTcpSelfTest.java | 7 + .../testsuites/IgniteCacheTestSuite4.java | 2 + .../IgniteSpiCommunicationSelfTestSuite.java | 2 + modules/hadoop/pom.xml | 1 + .../HadoopIgfs20FileSystemAbstractSelfTest.java | 13 + ...oopSecondaryFileSystemConfigurationTest.java | 14 + ...IgniteHadoopFileSystemHandshakeSelfTest.java | 7 + .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 7 + .../hadoop/HadoopAbstractSelfTest.java | 7 + .../processors/query/h2/IgniteH2Indexing.java | 44 +- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- ...CacheScanPartitionQueryFallbackSelfTest.java | 408 ++++++++++++++++++ .../cache/GridCacheCrossCacheQuerySelfTest.java | 12 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 77 +++- .../IgniteCacheQuerySelfTestSuite.java | 2 + modules/scalar-2.10/README.txt | 4 + modules/scalar-2.10/licenses/apache-2.0.txt | 202 +++++++++ .../scalar-2.10/licenses/scala-bsd-license.txt | 18 + modules/scalar-2.10/pom.xml | 197 +++++++++ modules/spark-2.10/README.txt | 4 + modules/spark-2.10/licenses/apache-2.0.txt | 202 +++++++++ .../spark-2.10/licenses/scala-bsd-license.txt | 18 + modules/spark-2.10/pom.xml | 120 ++++++ modules/spark/README.txt | 8 + modules/spark/licenses/apache-2.0.txt | 202 +++++++++ modules/spark/licenses/scala-bsd-license.txt | 18 + modules/spark/pom.xml | 114 +++++ .../org/apache/ignite/spark/IgniteContext.scala | 119 ++++++ .../org/apache/ignite/spark/IgniteRDD.scala | 244 +++++++++++ .../apache/ignite/spark/JavaIgniteContext.scala | 63 +++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 99 +++++ .../ignite/spark/impl/IgniteAbstractRDD.scala | 39 ++ .../ignite/spark/impl/IgnitePartition.scala | 24 ++ .../ignite/spark/impl/IgniteQueryIterator.scala | 27 ++ .../apache/ignite/spark/impl/IgniteSqlRDD.scala | 41 ++ .../spark/impl/JavaIgniteAbstractRDD.scala | 34 ++ .../ignite/spark/JavaIgniteRDDSelfTest.java | 298 +++++++++++++ .../scala/org/apache/ignite/spark/Entity.scala | 28 ++ .../org/apache/ignite/spark/IgniteRddSpec.scala | 231 +++++++++++ modules/visor-console-2.10/README.txt | 4 + modules/visor-console-2.10/pom.xml | 174 ++++++++ parent/pom.xml | 4 + pom.xml | 20 +- 99 files changed, 4773 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/DEVNOTES.txt ---------------------------------------------------------------------- diff --git a/DEVNOTES.txt b/DEVNOTES.txt index cd72418..d02e6ba 100644 --- a/DEVNOTES.txt +++ b/DEVNOTES.txt @@ -3,9 +3,15 @@ Ignite Fabric Maven Build Instructions Without LGPL dependencies (default): mvn clean package -DskipTests +Without LGPL dependencies and Scala 2.10: + mvn clean package -DskipTests -Dscala-2.10 + With LGPL dependencies: mvn clean package -DskipTests -Prelease,lgpl +With LGPL dependencies and Scala 2.10: + mvn clean package -DskipTests -Prelease,lgpl -Dscala-2.10 + Look for incubator-ignite--bin.zip in ./target/bin directory. NOTE: JDK version should be 1.7.0-* or >= 1.8.0-u40. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/assembly/dependencies-fabric.xml ---------------------------------------------------------------------- diff --git a/assembly/dependencies-fabric.xml b/assembly/dependencies-fabric.xml index a294243..c6668f6 100644 --- a/assembly/dependencies-fabric.xml +++ b/assembly/dependencies-fabric.xml @@ -113,6 +113,7 @@ org.apache.ignite:ignite-examples org.apache.ignite:ignite-indexing org.apache.ignite:ignite-visor-console + org.apache.ignite:ignite-visor-console_2.10 org.apache.ignite:ignite-visor-plugins org.apache.ignite:ignite-visor-trial org.apache.ignite:ignite-hadoop http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 78c5852..a775987 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -173,6 +173,40 @@ + scala-2.10 + + + + org.apache.ignite + ignite-scalar_2.10 + ${project.version} + + + + org.scalatest + scalatest_2.10 + 2.2.2 + test + + + org.scala-lang + scala-library + + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + + + java8-examples http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/core/pom.xml b/modules/core/pom.xml index 0460b46..47ed9cb 100644 --- a/modules/core/pom.xml +++ b/modules/core/pom.xml @@ -129,7 +129,6 @@ org.gridgain ignite-shmem 1.0.0 - test http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 439ea2d..b166f39 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -337,6 +337,9 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_SQL_MERGE_TABLE_MAX_SIZE = "IGNITE_SQL_MERGE_TABLE_MAX_SIZE"; + /** Maximum size for affinity assignment history. */ + public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java index 688eb2e..e6b69bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -36,11 +36,23 @@ public final class ScanQuery extends Query> { /** */ private IgniteBiPredicate filter; + /** */ + private Integer part; + /** * Create scan query returning all entries. */ public ScanQuery() { - this(null); + this(null, null); + } + + /** + * Creates partition scan query returning all entries for given partition. + * + * @param part Partition. + */ + public ScanQuery(int part) { + this(part, null); } /** @@ -49,6 +61,17 @@ public final class ScanQuery extends Query> { * @param filter Filter. If {@code null} then all entries will be returned. */ public ScanQuery(@Nullable IgniteBiPredicate filter) { + this(null, filter); + } + + /** + * Create scan query with filter. + * + * @param part Partition. + * @param filter Filter. If {@code null} then all entries will be returned. + */ + public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate filter) { + setPartition(part); setFilter(filter); } @@ -73,6 +96,26 @@ public final class ScanQuery extends Query> { return this; } + /** + * Gets partition number over which this query should iterate. Will return {@code null} if partition was not + * set. In this case query will iterate over all partitions in the cache. + * + * @return Partition number or {@code null}. + */ + @Nullable public Integer getPartition() { + return part; + } + + /** + * Sets partition number over which this query should iterate. If {@code null}, query will iterate over + * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache. + * + * @param part Partition number over which this query should iterate. + */ + public void setPartition(@Nullable Integer part) { + this.part = part; + } + /** {@inheritDoc} */ @Override public ScanQuery setPageSize(int pageSize) { return (ScanQuery)super.setPageSize(pageSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 1aa4fd6..a16438c 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -774,7 +774,6 @@ public class CacheConfiguration extends MutableConfiguration { * * @param loadPrevVal Load previous value flag. * @return {@code this} for chaining. - * @return {@code this} for chaining. */ public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) { this.loadPrevVal = loadPrevVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 47f222e..6989385 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -408,9 +408,10 @@ public class GridAffinityAssignmentCache { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + "calculated [locNodeId=" + ctx.localNodeId() + ", cache=" + cacheName + - ", history=" + affCache.keySet() + ", topVer=" + topVer + - ", head=" + head.get().topologyVersion() + ']'); + ", head=" + head.get().topologyVersion() + + ", history=" + affCache.keySet() + + ']'); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d8d029e..2ca7687 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1337,8 +1337,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache loadedKeys = new GridConcurrentHashSet<>(); - IgniteInternalFuture readFut = - readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2() { + IgniteInternalFuture readFut = readThroughAllAsync(absentKeys, true, skipVals, null, + subjId, taskName, new CI2() { /** Version for all loaded entries. */ private GridCacheVersion nextVer = ctx.versions().next(); @@ -1968,7 +1968,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) { + public IgniteInternalFuture getAndPutAsync0(final K key, final V val, + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3137,7 +3138,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache keys, long timeout) throws IgniteCheckedException { + @Override public boolean lockAll(@Nullable Collection keys, long timeout) + throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -3711,7 +3713,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable()) + CacheQueryFuture> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable()) .keepAll(false) .execute(); @@ -3944,7 +3946,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache extends GridCacheSharedMana private static final int EXCHANGE_HISTORY_SIZE = 1000; /** Cleanup history size. */ - public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = 10; + public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100); /** Atomic reference for pending timeout object. */ private AtomicReference pendingResend = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 871cd77..5582ba7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1962,7 +1962,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.cacheType(cacheType); - return F.first(initiateCacheChanges(F.asList(req))); + return F.first(initiateCacheChanges(F.asList(req), failIfExists)); } /** @@ -1972,14 +1972,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { public IgniteInternalFuture dynamicStopCache(String cacheName) { DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(), true); - return F.first(initiateCacheChanges(F.asList(t))); + return F.first(initiateCacheChanges(F.asList(t), false)); } /** * @param reqs Requests. * @return Collection of futures. */ - public Collection initiateCacheChanges(Collection reqs) { + @SuppressWarnings("TypeMayBeWeakened") + private Collection initiateCacheChanges(Collection reqs, + boolean failIfExists) { Collection res = new ArrayList<>(reqs.size()); Collection sndReqs = new ArrayList<>(reqs.size()); @@ -2012,9 +2014,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { maskNull(req.cacheName()), fut); if (old != null) { - if (req.start() && !req.clientStartOnly()) { - fut.onDone(new CacheExistsException("Failed to start cache " + - "(a cache with the same name is already being started or stopped): " + req.cacheName())); + if (req.start()) { + if (!req.clientStartOnly()) { + if (failIfExists) + fut.onDone(new CacheExistsException("Failed to start cache " + + "(a cache with the same name is already being started or stopped): " + + req.cacheName())); + else { + fut = old; + + continue; + } + } + else { + fut = old; + + continue; + } } else { fut = old; @@ -2664,7 +2680,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.clientStartOnly(true); - F.first(initiateCacheChanges(F.asList(req))).get(); + F.first(initiateCacheChanges(F.asList(req), false)).get(); IgniteCacheProxy cache = jCacheProxies.get(masked); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 772e849..d0d9049 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1251,7 +1251,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); if (offHeapEnabled() && !swapEnabled()) - return rawOffHeapIterator(true, true); + return rawOffHeapIterator(null, true, true); if (swapEnabled() && !offHeapEnabled()) return rawSwapIterator(true, true); @@ -1267,7 +1267,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private Map.Entry cur; { - it = rawOffHeapIterator(true, true); + it = rawOffHeapIterator(null, true, true); advance(); } @@ -1598,11 +1598,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param c Key/value closure. + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Off-heap iterator. */ public GridCloseableIterator rawOffHeapIterator(final CX2, T2, T> c, + Integer part, boolean primary, boolean backup) { @@ -1618,24 +1620,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator(parts) { @Override protected GridCloseableIterator partitionIterator(int part) - throws IgniteCheckedException - { + throws IgniteCheckedException { return offheap.iterator(spaceName, c, part); } }; } /** + * + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Raw off-heap iterator. */ - public GridCloseableIterator> rawOffHeapIterator(final boolean primary, + public GridCloseableIterator> rawOffHeapIterator(@Nullable Integer part, + final boolean primary, final boolean backup) { if (!offheapEnabled || (!primary && !backup)) @@ -1673,8 +1682,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator, IgniteBiTuple>(parts) { private Map.Entry cur; @@ -1751,6 +1765,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param part Partition. + * @return Raw off-heap iterator. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator> rawSwapIterator(int part) + throws IgniteCheckedException + { + if (!swapEnabled) + return new GridEmptyCloseableIterator<>(); + + checkIteratorQueue(); + + return new CloseablePartitionsIterator, Map.Entry>( + Collections.singleton(part)) { + @Override protected GridCloseableIterator> partitionIterator(int part) + throws IgniteCheckedException + { + return swapMgr.rawIterator(spaceName, part); + } + }; + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 4390993..69ce7b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -353,7 +353,8 @@ public class IgniteCacheProxy extends AsyncSupportAdapter p = ((ScanQuery)filter).getFilter(); - qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable); + qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(), + isKeepPortable); if (grp != null) qry.projection(grp); @@ -496,10 +497,14 @@ public class IgniteCacheProxy extends AsyncSupportAdapter)queryContinuous((ContinuousQuery)qry, qry.isLocal()); if (qry instanceof SqlQuery) { - SqlQuery p = (SqlQuery)qry; + final SqlQuery p = (SqlQuery)qry; if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal()) - return (QueryCursor)new QueryCursorImpl<>(ctx.kernalContext().query().queryLocal(ctx, p)); + return (QueryCursor)new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return ctx.kernalContext().query().queryLocal(ctx, p); + } + }); return (QueryCursor)ctx.kernalContext().query().queryTwoStep(ctx, p); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 7cb9efc..d68c377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -27,6 +27,9 @@ import java.util.*; * Query cursor implementation. */ public class QueryCursorImpl implements QueryCursorEx { + /** Query executor. */ + private Iterable iterExec; + /** */ private Iterator iter; @@ -34,18 +37,18 @@ public class QueryCursorImpl implements QueryCursorEx { private boolean iterTaken; /** */ - private Collection fieldsMeta; + private List fieldsMeta; /** - * @param iter Iterator. + * @param iterExec Query executor. */ - public QueryCursorImpl(Iterator iter) { - this.iter = iter; + public QueryCursorImpl(Iterable iterExec) { + this.iterExec = iterExec; } /** {@inheritDoc} */ @Override public Iterator iterator() { - if (iter == null) + if (iter == null && iterTaken) throw new IgniteException("Cursor is closed."); if (iterTaken) @@ -53,12 +56,16 @@ public class QueryCursorImpl implements QueryCursorEx { iterTaken = true; + iter = iterExec.iterator(); + + assert iter != null; + return iter; } /** {@inheritDoc} */ @Override public List getAll() { - ArrayList all = new ArrayList<>(); + List all = new ArrayList<>(); try { for (T t : this) // Implicitly calls iterator() to do all checks. @@ -103,14 +110,14 @@ public class QueryCursorImpl implements QueryCursorEx { /** * @param fieldsMeta SQL Fields query result metadata. */ - public void fieldsMeta(Collection fieldsMeta) { + public void fieldsMeta(List fieldsMeta) { this.fieldsMeta = fieldsMeta; } /** * @return SQL Fields query result metadata. */ - public Collection fieldsMeta() { + @Override public List fieldsMeta() { return fieldsMeta; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 0749f66..8ac3809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable } /** + * @return Keys belonging to partition. + */ + public Set keySet() { + return map.keySet(); + } + + /** * @return Entries belonging to partition. */ public Collection entries() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 0658828..2d2db1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -76,7 +76,7 @@ import org.jetbrains.annotations.*; * *
  • * Joins will work correctly only if joined objects are stored in - * collocated mode or at least one side of the join is stored in + * colocated mode or at least one side of the join is stored in * {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to * {@link AffinityKey} javadoc for more information about colocation. *
  • http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index a579aab..2b93144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage false, null, req.keyValueFilter(), + req.partition(), req.className(), req.clause(), req.includeMetaData(), @@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage qry.query().clause(), clsName, qry.query().scanFilter(), + qry.query().partition(), qry.reducer(), qry.transform(), qry.query().pageSize(), @@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage qry.query().clause(), null, null, + null, qry.reducer(), qry.transform(), qry.query().pageSize(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 7f0a5ec..5b82c34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -21,13 +21,17 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; + import org.jetbrains.annotations.*; import java.util.*; @@ -38,6 +42,13 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy * Query adapter. */ public class GridCacheQueryAdapter implements CacheQuery { + /** Is local node predicate. */ + private static final IgnitePredicate IS_LOC_NODE = new IgnitePredicate() { + @Override public boolean apply(ClusterNode n) { + return n.isLocal(); + } + }; + /** */ private final GridCacheContext cctx; @@ -56,6 +67,9 @@ public class GridCacheQueryAdapter implements CacheQuery { /** */ private final IgniteBiPredicate filter; + /** Partition. */ + private Integer part; + /** */ private final boolean incMeta; @@ -95,6 +109,7 @@ public class GridCacheQueryAdapter implements CacheQuery { * @param clsName Class name. * @param clause Clause. * @param filter Scan filter. + * @param part Partition. * @param incMeta Include metadata flag. * @param keepPortable Keep portable flag. */ @@ -103,16 +118,19 @@ public class GridCacheQueryAdapter implements CacheQuery { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate filter, + @Nullable Integer part, boolean incMeta, boolean keepPortable) { assert cctx != null; assert type != null; + assert part == null || part >= 0; this.cctx = cctx; this.type = type; this.clsName = clsName; this.clause = clause; this.filter = filter; + this.part = part; this.incMeta = incMeta; this.keepPortable = keepPortable; @@ -132,6 +150,7 @@ public class GridCacheQueryAdapter implements CacheQuery { * @param dedup Enable dedup flag. * @param prj Grid projection. * @param filter Key-value filter. + * @param part Partition. * @param clsName Class name. * @param clause Clause. * @param incMeta Include metadata flag. @@ -149,6 +168,7 @@ public class GridCacheQueryAdapter implements CacheQuery { boolean dedup, ClusterGroup prj, IgniteBiPredicate filter, + @Nullable Integer part, @Nullable String clsName, String clause, boolean incMeta, @@ -165,6 +185,7 @@ public class GridCacheQueryAdapter implements CacheQuery { this.dedup = dedup; this.prj = prj; this.filter = filter; + this.part = part; this.clsName = clsName; this.clause = clause; this.incMeta = incMeta; @@ -334,6 +355,13 @@ public class GridCacheQueryAdapter implements CacheQuery { } /** + * @return Partition. + */ + @Nullable public Integer partition() { + return part; + } + + /** * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { @@ -410,16 +438,18 @@ public class GridCacheQueryAdapter implements CacheQuery { taskHash = cctx.kernalContext().job().currentTaskNameHash(); - GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer)rmtReducer, + final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer)rmtReducer, (IgniteClosure)rmtTransform, args); - GridCacheQueryManager qryMgr = cctx.queries(); + final GridCacheQueryManager qryMgr = cctx.queries(); boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); + else if (type == SCAN && part != null && nodes.size() > 1) + return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); else return (CacheQueryFuture)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -439,15 +469,15 @@ public class GridCacheQueryAdapter implements CacheQuery { return Collections.singletonList(cctx.localNode()); case REPLICATED: - if (prj != null) - return nodes(cctx, prj); + if (prj != null || partition() != null) + return nodes(cctx, prj, partition()); return cctx.affinityNode() ? Collections.singletonList(cctx.localNode()) : - Collections.singletonList(F.rand(nodes(cctx, null))); + Collections.singletonList(F.rand(nodes(cctx, null, partition()))); case PARTITIONED: - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); default: throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); @@ -459,17 +489,26 @@ public class GridCacheQueryAdapter implements CacheQuery { * @param prj Projection (optional). * @return Collection of data nodes in provided projection (if any). */ - private static Collection nodes(final GridCacheContext cctx, @Nullable final ClusterGroup prj) { + private static Collection nodes(final GridCacheContext cctx, + @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; + final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + Collection affNodes = CU.affinityNodes(cctx); - if (prj == null) + if (prj == null && part == null) return affNodes; + final Set owners = + part == null ? Collections.emptySet() : new HashSet<>(cctx.topology().owners(part, topVer)); + return F.view(affNodes, new P1() { @Override public boolean apply(ClusterNode n) { - return prj.node(n.id()) != null; + + return cctx.discovery().cacheAffinityNode(n, cctx.name()) && + (prj == null || prj.node(n.id()) != null) && + (part == null || owners.contains(n)); } }); } @@ -478,4 +517,94 @@ public class GridCacheQueryAdapter implements CacheQuery { @Override public String toString() { return S.toString(GridCacheQueryAdapter.class, this); } + + /** + * Wrapper for queries with fallback. + */ + private static class CacheQueryFallbackFuture extends GridFutureAdapter> + implements CacheQueryFuture { + /** Query future. */ + private volatile GridCacheQueryFutureAdapter fut; + + /** Backups. */ + private final Queue nodes; + + /** Bean. */ + private final GridCacheQueryBean bean; + + /** Query manager. */ + private final GridCacheQueryManager qryMgr; + + /** + * @param nodes Backups. + * @param bean Bean. + * @param qryMgr Query manager. + */ + public CacheQueryFallbackFuture(Collection nodes, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr) { + this.nodes = fallbacks(nodes); + this.bean = bean; + this.qryMgr = qryMgr; + + init(); + } + + /** + * @param nodes Nodes. + */ + private Queue fallbacks(Collection nodes) { + Queue fallbacks = new LinkedList<>(); + + ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE)); + + if (node != null) + fallbacks.add(node); + + fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes); + + return fallbacks; + } + + /** + * + */ + private void init() { + ClusterNode node = nodes.poll(); + + GridCacheQueryFutureAdapter fut0 = + (GridCacheQueryFutureAdapter)(node.isLocal() ? qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); + + fut0.listen(new IgniteInClosure>>() { + @Override public void apply(IgniteInternalFuture> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + if (F.isEmpty(nodes)) + onDone(e); + else + init(); + } + } + }); + + fut = fut0; + } + + /** {@inheritDoc} */ + @Override public int available() { + return fut.available(); + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return fut.cancel(); + } + + /** {@inheritDoc} */ + @Override public R next() { + return fut.next(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 32e9d63..6e71ba7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -52,6 +52,8 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; /** @@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte final Object recipient = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1>>() { - @Override - public void applyx(IgniteInternalFuture> f) + @Override public void applyx(IgniteInternalFuture> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } @@ -768,98 +769,138 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter> heapIt = new GridCloseableIteratorAdapter>() { - private IgniteBiTuple next; + final GridCloseableIteratorAdapter> heapIt = + new GridCloseableIteratorAdapter>() { + private IgniteBiTuple next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + private Iterator iter; - { - advance(); - } + private GridDhtLocalPartition locPart; - @Override public boolean onHasNext() { - return next != null; - } + { + Integer part = qry.partition(); - @Override public IgniteBiTuple onNext() { - if (next == null) - throw new NoSuchElementException(); + if (part == null || dht == null) + iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + iter = F.emptyIterator(); + else { + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - IgniteBiTuple next0 = next; + locPart = dht.topology().localPartition(part, topVer, false); - advance(); + if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) || + !locPart.reserve()) + throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - return next0; - } + iter = new Iterator() { + private Iterator iter0 = locPart.keySet().iterator(); - private void advance() { - IgniteBiTuple next0 = null; - - while (iter.hasNext()) { - next0 = null; + @Override public boolean hasNext() { + return iter0.hasNext(); + } - K key = iter.next(); + @Override public K next() { + KeyCacheObject key = iter0.next(); - V val; + return key.value(cctx.cacheObjectContext(), false); + } - try { - val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + @Override public void remove() { + iter0.remove(); + } + }; } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - val = null; - } + advance(); + } - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); + @Override public boolean onHasNext() { + return next != null; + } - expiryPlc = cctx.cache().expiryPolicy(plc); - } + @Override public IgniteBiTuple onNext() { + if (next == null) + throw new NoSuchElementException(); - if (val != null) { - next0 = F.t(key, val); + IgniteBiTuple next0 = next; - if (checkPredicate(next0)) - break; - else - next0 = null; - } + advance(); + + return next0; } - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; + private void advance() { + IgniteBiTuple next0 = null; - if (next == null) - sendTtlUpdate(); - } + while (iter.hasNext()) { + next0 = null; - @Override protected void onClose() { - sendTtlUpdate(); - } + K key = iter.next(); + + V val; - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); + try { + val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } - expiryPlc = null; + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().expiryPolicy(plc); + } + + if (val != null) { + next0 = F.t(key, val); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } + } + + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; + + if (next == null) + sendTtlUpdate(); } - } - private boolean checkPredicate(Map.Entry e) { - if (keyValFilter != null) { - Map.Entry e0 = (Map.Entry)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + @Override protected void onClose() { + sendTtlUpdate(); - return keyValFilter.apply(e0.getKey(), e0.getValue()); + if (locPart != null) + locPart.release(); } - return true; - } - }; + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + private boolean checkPredicate(Map.Entry e) { + if (keyValFilter != null) { + Map.Entry e0 = (Map.Entry)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } + + return true; + } + }; final GridIterator> it; @@ -914,7 +955,10 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte throws IgniteCheckedException { IgniteBiPredicate filter = qry.scanFilter(); - Iterator> it = cctx.swap().rawSwapIterator(true, backups); + Integer part = qry.partition(); + + Iterator> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : + cctx.swap().rawSwapIterator(part); return scanIterator(it, filter, qry.keepPortable()); } @@ -930,10 +974,10 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); - return cctx.swap().rawOffHeapIterator(c, true, backups); + return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { - Iterator> it = cctx.swap().rawOffHeapIterator(true, backups); + Iterator> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); return scanIterator(it, filter, qry.keepPortable()); } @@ -1222,7 +1266,9 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure, Object> trans = (IgniteClosure, Object>)qryInfo.transformer(); + IgniteClosure, Object> trans = + (IgniteClosure, Object>)qryInfo.transformer(); + IgniteReducer, Object> rdc = (IgniteReducer, Object>)qryInfo.reducer(); injectResources(trans); @@ -1282,9 +1328,11 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte K key = row.getKey(); - // Filter backups for SCAN queries. Other types are filtered in indexing manager. - if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN && - !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) { + // Filter backups for SCAN queries, if it isn't partition scan. + // Other types are filtered in indexing manager. + if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null && + cctx.config().getCacheMode() != LOCAL && !incBackups && + !cctx.affinity().primary(cctx.localNode(), key, topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring backup element [row=" + row + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups + @@ -1529,11 +1577,6 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false, qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId()))); } - catch (Error e) { - fut.onDone(e); - - throw e; - } catch (Throwable e) { fut.onDone(e); @@ -1843,7 +1886,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte return new IgniteBiPredicate() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE); + return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE); } }; } @@ -2920,6 +2963,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte null, null, null, + null, false, keepPortable); } @@ -2928,17 +2972,19 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * Creates user's predicate based scan query. * * @param filter Scan filter. + * @param part Partition. * @param keepPortable Keep portable flag. * @return Created query. */ - @SuppressWarnings("unchecked") public CacheQuery> createScanQuery(@Nullable IgniteBiPredicate filter, - boolean keepPortable) { + @Nullable Integer part, boolean keepPortable) { + return new GridCacheQueryAdapter<>(cctx, SCAN, null, null, (IgniteBiPredicate)filter, + part, false, keepPortable); } @@ -2962,6 +3008,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte clsName, search, null, + null, false, keepPortable); } @@ -2982,6 +3029,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte null, qry, null, + null, false, keepPortable); } @@ -3002,6 +3050,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte null, qry, null, + null, incMeta, keepPortable); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 845077f..2113e7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -26,6 +26,8 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + import java.io.*; import java.nio.*; import java.util.*; @@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** */ private int taskHash; + /** Partition. */ + private int part; + /** * Required by {@link Externalizable} */ @@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param clause Query clause. * @param clsName Query class name. * @param keyValFilter Key-value filter. + * @param part Partition. * @param rdc Reducer. * @param trans Transformer. * @param pageSize Page size. @@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache String clause, String clsName, IgniteBiPredicate keyValFilter, + @Nullable Integer part, IgniteReducer rdc, IgniteClosure trans, int pageSize, @@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.clause = clause; this.clsName = clsName; this.keyValFilter = keyValFilter; + this.part = part == null ? -1 : part; this.rdc = rdc; this.trans = trans; this.pageSize = pageSize; @@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache return taskHash; } + /** + * @return partition. + */ + @Nullable public Integer partition() { + return part == -1 ? null : part; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -508,30 +523,36 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 16: - if (!writer.writeByteArray("rdcBytes", rdcBytes)) + if (!writer.writeInt("part", part)) return false; writer.incrementState(); case 17: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeByteArray("rdcBytes", rdcBytes)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskHash", taskHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeByteArray("transBytes", transBytes)) + if (!writer.writeInt("taskHash", taskHash)) return false; writer.incrementState(); case 20: + if (!writer.writeByteArray("transBytes", transBytes)) + return false; + + writer.incrementState(); + + case 21: if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1)) return false; @@ -658,7 +679,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 16: - rdcBytes = reader.readByteArray("rdcBytes"); + part = reader.readInt("part"); if (!reader.isLastRead()) return false; @@ -666,7 +687,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 17: - subjId = reader.readUuid("subjId"); + rdcBytes = reader.readByteArray("rdcBytes"); if (!reader.isLastRead()) return false; @@ -674,7 +695,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 18: - taskHash = reader.readInt("taskHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -682,7 +703,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 19: - transBytes = reader.readByteArray("transBytes"); + taskHash = reader.readInt("taskHash"); if (!reader.isLastRead()) return false; @@ -690,6 +711,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 20: + transBytes = reader.readByteArray("transBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 21: byte typeOrd; typeOrd = reader.readByte("type"); @@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 21; + return 22; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java index bf1d4ea..5e19b99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryCursorEx.java @@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.query; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.processors.query.*; + +import java.util.*; /** * Extended query cursor interface allowing for "getAll" to output data into destination other than Collection. @@ -32,6 +35,11 @@ public interface QueryCursorEx extends QueryCursor { public void getAll(Consumer c) throws IgniteCheckedException; /** + * @return Query metadata. + */ + public List fieldsMeta(); + + /** * Query value consumer. */ public static interface Consumer { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f516968..f74fe95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -114,7 +114,7 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite } CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -345,7 +345,7 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite private GridCloseableIterator iterator0() { try { CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 0bb820d..7fcc284 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -58,9 +58,11 @@ public interface GridQueryIndexing { * * @param cctx Cache context. * @param qry Query. + * @param keepCacheObjects If {@code true}, cache objects representation will be preserved. * @return Cursor. */ - public QueryCursor> queryTwoStep(GridCacheContext cctx, GridCacheTwoStepQuery qry); + public Iterable> queryTwoStep(GridCacheContext cctx, GridCacheTwoStepQuery qry, + boolean keepCacheObjects); /** * Parses SQL query into two step query and executes it. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ed8e1e2..e187713 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -539,16 +539,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param qry Query. * @return Cursor. */ - public QueryCursor> queryTwoStep(String space, GridCacheTwoStepQuery qry) { + public Iterable> queryTwoStep(String space, GridCacheTwoStepQuery qry) { checkxEnabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + GridCacheContext cacheCtx = ctx.cache().internalCache(space).context(); + return idx.queryTwoStep( - ctx.cache().internalCache(space).context(), - qry); + cacheCtx, + qry, + cacheCtx.keepPortable()); } finally { busyLock.leaveBusy(); @@ -715,12 +718,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { String sql = qry.getSql(); Object[] args = qry.getArgs(); - GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); + final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); sendQueryExecutedEvent(sql, args); - QueryCursorImpl> cursor = new QueryCursorImpl<>( - new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable())); + QueryCursorImpl> cursor = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()); + } + }); cursor.fieldsMeta(res.metaData()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 64eb1c1..bb451c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -934,7 +934,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { GridCacheQueryManager qryMgr = cache.context().queries(); - CacheQuery> qry = qryMgr.createScanQuery(p, false); + CacheQuery> qry = qryMgr.createScanQuery(p, null, false); qry.keepAll(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java index 42fe089..4946eb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java @@ -138,7 +138,7 @@ public final class GridJavaProcess { procCommands.add(javaBin); procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs); - if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) { + if (jvmArgs == null || (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath"))) { String classpath = System.getProperty("java.class.path"); String sfcp = System.getProperty("surefire.test.class.path"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 0932212..9016b10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9025,11 +9025,11 @@ public abstract class IgniteUtils { hasShmem = false; else { try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(null); hasShmem = true; } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException ignore) { hasShmem = false; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java index 27a234f..c935c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { boolean clear = true; try { - IpcSharedMemoryNativeLoader.load(); + IpcSharedMemoryNativeLoader.load(log); sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);