ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [ignite] branch master updated: IGNITE-12882 .NET: Use platform Near Cache for local ScanQuery
Date Fri, 10 Apr 2020 05:21:11 GMT
This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e0a846c  IGNITE-12882 .NET: Use platform Near Cache for local ScanQuery
e0a846c is described below

commit e0a846cb7cde548e3d8d625c8c1159adf3968f30
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
AuthorDate: Fri Apr 10 08:20:50 2020 +0300

    IGNITE-12882 .NET: Use platform Near Cache for local ScanQuery
    
    Execute `ScanQuery` with `Local = true` and non-null `Partition` on .NET side when platform
Near Cache is available, avoiding Java roundtrip. ~40x perf improvement.
    
    Reserves partition same way as Java does in this case, so that correctness is guaranteed.
---
 .../processors/platform/cache/PlatformCache.java   |  48 ++++-
 .../platform/PlatformIsPartitionReservedTask.java  |  96 +++++++++
 .../Cache/Near/CacheNearTest.cs                    | 223 +++++++++++++++++++++
 .../Cache/Near/ScanQueryNearCacheFilter.cs         |  18 ++
 .../Log/CustomLoggerTest.cs                        |   9 +
 .../Apache.Ignite.Core/Apache.Ignite.Core.csproj   |   1 +
 .../Cache/Configuration/CacheConfiguration.cs      |  12 +-
 .../PlatformNearCacheConfiguration.cs              |  22 ++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs     |  68 +++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs       |   4 +-
 .../Impl/Cache/Near/INearCache.cs                  |   3 +-
 .../Impl/Cache/Near/NearCache.cs                   |   7 +-
 .../Impl/Cache/Query/FieldsQueryCursor.cs          |   1 -
 .../Impl/Cache/Query/NearQueryCursor.cs            | 166 +++++++++++++++
 14 files changed, 662 insertions(+), 16 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index f40830a..ca41ff0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -47,9 +47,13 @@ import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
@@ -81,7 +85,7 @@ import static org.apache.ignite.internal.processors.platform.client.ClientConnec
 /**
  * Native cache wrapper implementation.
  */
-@SuppressWarnings({"unchecked", "WeakerAccess"})
+@SuppressWarnings({"unchecked", "WeakerAccess", "rawtypes"})
 public class PlatformCache extends PlatformAbstractTarget {
     /** */
     public static final int OP_CLEAR = 1;
@@ -361,6 +365,12 @@ public class PlatformCache extends PlatformAbstractTarget {
 
     /** */
     private static final int OP_PUT_WITH_NEAR = 95;
+    
+    /** */
+    private static final int OP_RESERVE_PARTITION = 96;
+
+    /** */
+    private static final int OP_RELEASE_PARTITION = 97;
 
     /** Underlying JCache in binary mode. */
     private final IgniteCacheProxy cache;
@@ -1205,6 +1215,23 @@ public class PlatformCache extends PlatformAbstractTarget {
                 cache.clearStatistics();
 
                 return TRUE;
+
+            case OP_RESERVE_PARTITION: {
+                GridDhtLocalPartition locPart = getLocalPartition((int)val);
+
+                return locPart != null && locPart.reserve() ? TRUE : FALSE;
+            }
+
+            case OP_RELEASE_PARTITION: {
+                GridDhtLocalPartition locPart = getLocalPartition((int)val);
+
+                if (locPart != null) {
+                    locPart.release();
+                    return TRUE;
+                }
+
+                return FALSE;
+            }
         }
         return super.processInLongOutLong(type, val);
     }
@@ -1633,6 +1660,25 @@ public class PlatformCache extends PlatformAbstractTarget {
     }
 
     /**
+     * Gets local partition.
+     *
+     * @param part Partition id.
+     * @return Partition when local, null otherwise.
+     */
+    private GridDhtLocalPartition getLocalPartition(int part) throws IgniteCheckedException
{
+        GridCacheContext cctx = cache.context();
+
+        if (part < 0 || part >= cctx.affinity().partitions())
+            throw new IgniteCheckedException("Invalid partition number: " + part);
+
+        GridDhtPartitionTopology top = cctx.topology();
+
+        AffinityTopologyVersion ver = top.readyTopologyVersion();
+
+        return top.localPartition(part, ver, false);
+    }
+
+    /**
      * Writes error with EntryProcessorException cause.
      */
     private static class GetAllWriter implements PlatformFutureUtils.Writer {
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformIsPartitionReservedTask.java
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformIsPartitionReservedTask.java
new file mode 100644
index 0000000..47b80bf
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformIsPartitionReservedTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to get Java thread names.
+ */
+public class PlatformIsPartitionReservedTask extends ComputeTaskAdapter<Object[], Boolean>
{
+    /** {@inheritDoc} */
+    @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
+        @Nullable Object[] arg) {
+        //noinspection OptionalGetWithoutIsPresent
+        ClusterNode localNode = subgrid.stream().filter(ClusterNode::isLocal).findFirst().get();
+
+        return Collections.singletonMap(
+                new PlatformIsPartitionReservedJob((String)arg[0], (Integer)arg[1]), localNode);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) {
+        return results.get(0).getData();
+    }
+
+    /**
+     * Job.
+     */
+    private static class PlatformIsPartitionReservedJob extends ComputeJobAdapter {
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final int part;
+
+        /** */
+        @SuppressWarnings("unused")
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * Constructor.
+         *
+         * @param cacheName Cache name.
+         * @param part Partition.
+         */
+        public PlatformIsPartitionReservedJob(String cacheName, Integer part) {
+            this.cacheName = cacheName;
+            this.part = part;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Boolean execute() {
+            GridKernalContext ctx = ((IgniteEx) ignite).context();
+
+            GridDhtPartitionTopology top = ctx.cache().cache(cacheName).context().topology();
+
+            GridDhtLocalPartition locPart = top.localPartition(part, top.readyTopologyVersion(),
false);
+
+            assert locPart != null;
+
+            return locPart.reservations() > 0;
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/CacheNearTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/CacheNearTest.cs
index ab4c643..d372758 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/CacheNearTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/CacheNearTest.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
     using System;
     using System.Collections.Generic;
     using System.Linq;
+    using System.Security;
     using System.Threading;
     using System.Threading.Tasks;
     using System.Transactions;
@@ -30,6 +31,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
     using Apache.Ignite.Core.Cache.Expiry;
     using Apache.Ignite.Core.Cache.Query;
     using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Datastream;
     using Apache.Ignite.Core.Events;
     using Apache.Ignite.Core.Impl;
@@ -574,6 +576,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
             Assert.AreEqual(11, clientCache[1]);
         }
 
+        /// <summary>
+        /// Tests that scan query uses near cache to pass values to <see cref="ScanQuery{TK,TV}.Filter"/>
when possible.
+        /// </summary>
         [Test]
         public void TestScanQueryFilterUsesValueFromNearCache(
             [Values(CacheTestMode.ServerLocal, CacheTestMode.ServerRemote, CacheTestMode.Client)]
CacheTestMode mode)
@@ -594,6 +599,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
             Assert.AreEqual(count, res.Count());
         }
 
+        /// <summary>
+        /// Tests that scan query falls back to deserialized value from Java when near cache
value is missing.
+        /// </summary>
         [Test]
         public void TestScanQueryFilterUsesFallbackValueWhenNotInNearCache(
             [Values(CacheTestMode.ServerLocal, CacheTestMode.ServerRemote, CacheTestMode.Client)]
CacheTestMode mode)
@@ -628,6 +636,211 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
         }
 
         /// <summary>
+        /// Tests that local scan query uses near cache directly, avoiding Java roundtrip.

+        /// </summary>
+        [Test]
+        public void TestLocalScanQueryUsesKeysAndValuesFromNearCache([Values(true, false)]
bool withFilter,
+            [Values(true, false)] bool withPartition)
+        {
+            var cache = GetCache<int, Foo>(CacheTestMode.ServerLocal);
+            cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => x, x => new Foo(x)));
+
+            var qry = new ScanQuery<int, Foo>
+            {
+                Local = true,
+                Filter = withFilter 
+                    ? new ScanQueryNearCacheFilter
+                    {
+                        CacheName = cache.Name
+                    }
+                    : null,
+                Partition = withPartition 
+                    ? _grid.GetAffinity(cache.Name).GetPartition(TestUtils.GetPrimaryKey(_grid,
cache.Name)) 
+                    : (int?) null
+            };
+            
+            var res = cache.Query(qry);
+
+            foreach (var entry in res)
+            {
+                var localValue = cache.LocalPeek(entry.Key, CachePeekMode.PlatformNear);
+                
+                if (withPartition)
+                {
+                    // Local scan with partition works directly through platform cache.
+                    Assert.AreSame(entry.Value, localValue);
+                }
+                else
+                {
+                    // Local scan without partition works through Java.
+                    Assert.AreNotSame(entry.Value, localValue);
+                }
+            }
+
+            if (withPartition)
+            {
+                Assert.Throws<ObjectDisposedException>(() => res.GetAll());
+            }
+        }
+
+        /// <summary>
+        /// Tests that local scan query reserves the partition when <see cref="ScanQuery{TK,TV}.Partition"/>
is set. 
+        /// </summary>
+        [Test]
+        public void TestLocalScanQueryWithPartitionReservesPartitionAndReleasesItOnDispose()
+        {
+            var cache = GetCache<int, Foo>(CacheTestMode.ServerLocal);
+            
+            var key = TestUtils.GetPrimaryKey(_grid, cache.Name);
+            var part = _grid.GetAffinity(cache.Name).GetPartition(key);
+
+            cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => x, x => new Foo(x)));
+            
+            var qry = new ScanQuery<int, Foo>
+            {
+                Local = true,
+                Partition = part
+            };
+
+            Func<bool> isReserved = () => IsPartitionReserved(_grid, cache.Name,
part);
+            
+            Assert.IsFalse(isReserved());
+
+            // Full iteration.
+            using (var cursor = cache.Query(qry))
+            {
+                Assert.IsTrue(isReserved());
+
+                using (var enumerator = cursor.GetEnumerator())
+                {
+                    Assert.IsTrue(isReserved());
+                    
+                    while (enumerator.MoveNext())
+                    {
+                        Assert.IsTrue(isReserved());
+                    }
+                    
+                    Assert.IsFalse(isReserved());
+                }
+                
+                Assert.IsFalse(isReserved());
+            }
+            
+            Assert.IsFalse(isReserved());
+            
+            // Partial iteration with LINQ.
+            using (var cursor = cache.Query(qry))
+            {
+                Assert.IsTrue(isReserved());
+
+                var item = cursor.FirstOrDefault();
+                Assert.IsNotNull(item);
+
+                // Released because LINQ disposes the iterator. 
+                Assert.IsFalse(isReserved());
+            }
+            
+            // Partial iteration.
+            using (var cursor = cache.Query(qry))
+            {
+                Assert.IsTrue(isReserved());
+
+                var moved = cursor.GetEnumerator().MoveNext();
+                Assert.IsTrue(moved);
+
+                Assert.IsTrue(isReserved());
+            }
+            
+            Assert.IsFalse(isReserved());
+            
+            // GetAll without using block.
+            using (var cursor = cache.Query(qry))
+            {
+                Assert.IsTrue(isReserved());
+
+                var res = cursor.GetAll();
+                Assert.IsNotEmpty(res);
+
+                Assert.IsFalse(isReserved());
+            }
+            
+            // Exception in filter.
+            qry.Filter = new ScanQueryNearCacheFilter {FailKey = key};
+            
+            using (var cursor = cache.Query(qry))
+            {
+                Assert.IsTrue(isReserved());
+
+                Assert.Throws<SecurityException>(() => cursor.GetAll());
+
+                Assert.IsFalse(isReserved());
+            }
+        }
+
+        /// <summary>
+        /// Tests that invalid <see cref="ScanQuery{TK,TV}.Partition"/> causes correct
exception.
+        /// </summary>
+        [Test]
+        public void TestLocalScanQueryWithInvalidPartitionId()
+        {
+            var cache = GetCache<int, Foo>(CacheTestMode.ServerLocal);
+            var qry = new ScanQuery<int, Foo> {Local = true, Partition = 1024};
+            
+            var ex = Assert.Throws<IgniteException>(() => cache.Query(qry));
+            
+            Assert.AreEqual("Invalid partition number: 1024", ex.Message);
+        }
+
+        /// <summary>
+        /// Tests that local scan query throws an exception when <see cref="ScanQuery{TK,TV}.Partition"/>
is specified,
+        /// but that partition can not be reserved (belongs to remote node).
+        /// </summary>
+        [Test]
+        public void TestLocalScanQueryWithPartitionThrowsOnRemoteKeys()
+        {
+            var cache = GetCache<int, Foo>(CacheTestMode.ServerLocal);
+            
+            var partition = _grid2.GetAffinity(cache.Name)
+                .GetPrimaryPartitions(_grid2.GetCluster().GetLocalNode())
+                .First();
+            
+            var qry = new ScanQuery<int, Foo>
+            {
+                Local = true,
+                Partition = partition
+            };
+
+            var ex = Assert.Throws<InvalidOperationException>(() => cache.Query(qry).GetAll());
+            
+            Assert.AreEqual(
+                string.Format("Failed to reserve partition {0}, it does not belong to the
local node.", partition), 
+                ex.Message);
+        }
+        
+        /// <summary>
+        /// Tests local scan query on client node.
+        /// </summary>
+        [Test]
+        public void TestLocalScanQueryFromClientNode()
+        {
+            var cache = _grid.CreateCache<int, Foo>(TestUtils.TestName);
+            cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => x, x => new Foo(x)));
+
+            var clientCache = _client.CreateNearCache<int, Foo>(cache.Name, new NearCacheConfiguration(),
+                new PlatformNearCacheConfiguration());
+            
+            // Promote key to near cache.
+            clientCache.Get(2);
+            
+            var res = clientCache.Query(new ScanQuery<int, Foo> {Local = true}).GetAll();
+
+            // Local scan on client node returns empty collection.
+            Assert.AreEqual(1, clientCache.GetLocalSize(CachePeekMode.Near));
+            Assert.AreEqual(1, clientCache.GetLocalSize(CachePeekMode.PlatformNear));
+            Assert.IsEmpty(res);
+        }
+
+        /// <summary>
         /// Tests that expiry policy functionality plays well with platform near cache.
         /// </summary>
         [Test]
@@ -1360,6 +1573,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
             TestUtils.WaitForTrueCondition(() => _grid2.GetAffinity(CacheName).MapKeyToNode(1).IsLocal,
2000);
         }
 
+        /// <summary>
+        /// Gets a value indicating whether specified partition is reserved.
+        /// </summary>
+        private static bool IsPartitionReserved(IIgnite ignite, string cacheName, int part)
+        {
+            const string taskName = "org.apache.ignite.platform.PlatformIsPartitionReservedTask";
+
+            return ignite.GetCompute().ExecuteJavaTask<bool>(taskName, new object[]
{cacheName, part});
+        }
+
         /** */
         public enum CacheTestMode
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/ScanQueryNearCacheFilter.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/ScanQueryNearCacheFilter.cs
index 91eb092..4a394e4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/ScanQueryNearCacheFilter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Near/ScanQueryNearCacheFilter.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Core.Tests.Cache.Near
 {
+    using System.Security;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Resource;
     using NUnit.Framework;
@@ -26,13 +27,30 @@ namespace Apache.Ignite.Core.Tests.Cache.Near
     /// </summary>
     public class ScanQueryNearCacheFilter : ICacheEntryFilter<int, Foo>
     {
+        /// <summary>
+        /// Gets or sets the cache name.
+        /// </summary>
         public string CacheName { get; set; }
         
+        /// <summary>
+        /// Gets or sets the key that should cause an exception in <see cref="Invoke"/>.

+        /// </summary>
+        public int? FailKey { get; set; }
+        
+        /// <summary>
+        /// Injected Ignite.
+        /// </summary>
         [InstanceResource]
         public IIgnite Ignite { get; set; }
         
+        /** <inheritdoc /> */
         public bool Invoke(ICacheEntry<int, Foo> entry)
         {
+            if (entry.Key == FailKey)
+            {
+                throw new SecurityException("Crash in filter");
+            }
+            
             var cache = Ignite.GetCache<int, Foo>(CacheName);
             var nearVal = cache.LocalPeek(entry.Key, CachePeekMode.PlatformNear);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Log/CustomLoggerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Log/CustomLoggerTest.cs
index 543287e..1594fae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Log/CustomLoggerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Log/CustomLoggerTest.cs
@@ -449,5 +449,14 @@ namespace Apache.Ignite.Core.Tests.Log
                 throw new ArithmeticException("Error in func.");
             }
         }
+
+        /// <summary>
+        /// Custom enum for testing.
+        /// </summary>
+        private struct CustomEnum
+        {
+            // ReSharper disable once UnusedMember.Local
+            public int Field { get; set; }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 193068e..38a7b97 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -92,6 +92,7 @@
     <Compile Include="Impl\Cache\Near\NearCacheManager.cs" />
     <Compile Include="Impl\Cache\QueryMetricsImpl.cs" />
     <Compile Include="Impl\Cache\Query\QueryCursorField.cs" />
+    <Compile Include="Impl\Cache\Query\NearQueryCursor.cs" />
     <Compile Include="Impl\Client\ClientContextBase.cs" />
     <Compile Include="Impl\Client\ClientOpExtensions.cs" />
     <Compile Include="Impl\Client\ClientRequestContext.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
index b6eb992..46b2f08 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs
@@ -950,17 +950,7 @@ namespace Apache.Ignite.Core.Cache.Configuration
 
         /// <summary>
         /// Gets or sets platform near cache configuration.
-        /// <para />
-        /// Enables native .NET near cache when not null. Cache entries will be stored in
deserialized form in
-        /// CLR heap.
-        /// <para />
-        /// When enabled on server nodes, all primary keys will be stored in platform memory
as well.
-        /// <para />
-        /// Same eviction policy applies to near cache entries for all keys on client nodes
and
-        /// non-primary keys on server nodes.
-        /// <para />
-        /// Enabling this can greatly improve performance for key-value operations and scan
queries,
-        /// at the expense of RAM usage.
+        /// More details: <see cref="PlatformNearConfiguration"/>. 
         /// </summary>
         public PlatformNearCacheConfiguration PlatformNearConfiguration { get; set; }
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/PlatformNearCacheConfiguration.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/PlatformNearCacheConfiguration.cs
index ee22fab..c771d66 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/PlatformNearCacheConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/PlatformNearCacheConfiguration.cs
@@ -18,9 +18,31 @@
 namespace Apache.Ignite.Core.Cache.Configuration
 {
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Query;
 
     /// <summary>
     /// Native .NET near cache configuration.
+    /// <para />
+    /// Enables native .NET near cache. Cache entries will be stored in deserialized form
in CLR heap.
+    /// <para />
+    /// When enabled on server nodes, all primary keys will be stored in platform memory
as well.
+    /// <para />
+    /// Same eviction policy applies to near cache entries for all keys on client nodes and
+    /// non-primary keys on server nodes.
+    /// <para />
+    /// Enabling this can greatly improve performance for key-value operations and scan queries,
+    /// at the expense of RAM usage.
+    /// <para />
+    /// Supported operations (async counterparts included):
+    /// <list type="bullet">
+    /// <item><term><see cref="ICache{TK,TV}.Get"/>, <see cref="ICache{TK,TV}.TryGet"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.GetAll"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.ContainsKey"/>, <see cref="ICache{TK,TV}.ContainsKeys"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.LocalPeek"/>, <see cref="ICache{TK,TV}.TryLocalPeek"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.GetLocalEntries"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.GetLocalSize"/></term></item>
+    /// <item><term><see cref="ICache{TK,TV}.Query(QueryBase)"/> with <see
cref="ScanQuery{TK,TV}"/></term></item>
+    /// </list>
     /// </summary>
     public class PlatformNearCacheConfiguration
     {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 80b8851..8521dc8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -40,6 +40,7 @@ namespace Apache.Ignite.Core.Impl.Cache
     using Apache.Ignite.Core.Impl.Client;
     using Apache.Ignite.Core.Impl.Cluster;
     using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Resource;
     using Apache.Ignite.Core.Impl.Transactions;
     using BinaryReader = Apache.Ignite.Core.Impl.Binary.BinaryReader;
     using BinaryWriter = Apache.Ignite.Core.Impl.Binary.BinaryWriter;
@@ -1647,6 +1648,19 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(qry, "qry");
 
+            if (IsNear)
+            {
+                // NOTE: Users can pass a ScanQuery that has different generic arguments.
+                // We do not support this scenario for near cache scan optimization.
+                var scan = qry as ScanQuery<TK, TV>;
+
+                // Local scan with Partition can be satisfied directly from platform cache
on server nodes.
+                if (scan != null && scan.Local && scan.Partition != null)
+                {
+                    return ScanNear(scan);
+                }
+            }
+
             var cursor = DoOutOpObject((int) qry.OpId, writer => qry.Write(writer, IsKeepBinary));
 
             return new QueryCursor<TK, TV>(cursor, _flagKeepBinary);
@@ -2043,5 +2057,59 @@ namespace Apache.Ignite.Core.Impl.Cache
             writer.WriteInt(count);
             writer.Stream.Seek(endPos, SeekOrigin.Begin);
         }
+
+        /// <summary>
+        /// Reserves specified partition.
+        /// </summary>
+        private void ReservePartition(int part)
+        {
+            var reserved = Target.InLongOutLong((int) CacheOp.ReservePartition, part) ==
True;
+
+            if (!reserved)
+            {
+                // Java exception for Scan Query in this case is 'No queryable nodes for
partition N',
+                // which is a bit confusing.
+                throw new InvalidOperationException(
+                    string.Format("Failed to reserve partition {0}, it does not belong to
the local node.", part));
+            }
+        }
+
+        /// <summary>
+        /// Releases specified partition.
+        /// </summary>
+        private void ReleasePartition(int part)
+        {
+            var released = Target.InLongOutLong((int) CacheOp.ReleasePartition, part) ==
True;
+
+            if (!released)
+            {
+                throw new InvalidOperationException("Failed to release partition: " + part);
+            }
+        }
+
+        /// <summary>
+        /// Performs Scan query over Near Cache.
+        /// </summary>
+        private IQueryCursor<ICacheEntry<TK, TV>> ScanNear(ScanQuery<TK, TV>
qry)
+        {
+            var filter = qry.Filter;
+
+            if (filter != null)
+            {
+                ResourceProcessor.Inject(filter, Marshaller.Ignite);
+            }
+
+            var part = qry.Partition;
+            Action dispose = null;
+
+            if (part != null)
+            {
+                ReservePartition((int) part);
+                
+                dispose = () => ReleasePartition((int) part);
+            }
+
+            return new NearQueryCursor<TK, TV>(_nearCache, filter, part, dispose);
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
index b1b077b..93c658a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
@@ -115,6 +115,8 @@ namespace Apache.Ignite.Core.Impl.Cache
         SizeLongLoc = 92,
         EnableStatistics = 93,
         ClearStatistics = 94,
-        PutWithNear = 95
+        PutWithNear = 95,
+        ReservePartition = 96,
+        ReleasePartition = 97
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/INearCache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/INearCache.cs
index 912bd50..175167d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/INearCache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/INearCache.cs
@@ -77,6 +77,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Near
         /// <summary>
         /// Gets all entries.
         /// </summary>
-        IEnumerable<ICacheEntry<TK,TV>> GetEntries<TK, TV>();
+        /// <param name="partition"></param>
+        IEnumerable<ICacheEntry<TK, TV>> GetEntries<TK, TV>(int? partition
= null);
     }
 }
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/NearCache.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/NearCache.cs
index 64b96dc..152e2e1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/NearCache.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Near/NearCache.cs
@@ -199,7 +199,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Near
         }
 
         /** <inheritdoc /> */
-        public IEnumerable<ICacheEntry<TKey, TVal>> GetEntries<TKey, TVal>()
+        public IEnumerable<ICacheEntry<TKey, TVal>> GetEntries<TKey, TVal>(int?
partition)
         {
             if (_stopped)
             {
@@ -208,6 +208,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Near
 
             foreach (var e in _map)
             {
+                if (partition != null && e.Value.Partition != partition)
+                {
+                    continue;
+                }
+
                 if (!IsValid(e.Value))
                 {
                     continue;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
index 8cb3430..a309515 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query
     using System;
     using System.Collections.Generic;
     using System.Collections.ObjectModel;
-    using System.Linq;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache.Query;
     using Apache.Ignite.Core.Impl.Binary;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/NearQueryCursor.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/NearQueryCursor.cs
new file mode 100644
index 0000000..8c76622
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/NearQueryCursor.cs
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Query
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.Linq;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Query;
+    using Apache.Ignite.Core.Impl.Cache.Near;
+
+    /// <summary>
+    /// Query cursor over platform near cache.
+    /// </summary>
+    internal sealed class NearQueryCursor<TK, TV> : IQueryCursor<ICacheEntry<TK,
TV>>
+    {
+        /** */
+        private readonly INearCache _nearCache;
+        
+        /** */
+        private readonly Action _dispose;
+
+        /** */
+        private readonly ICacheEntryFilter<TK, TV> _filter;
+        
+        /** */
+        private readonly int? _partition;
+
+        /** */
+        private bool _disposed;
+
+        /** */
+        private bool _iterCalled;
+
+        /// <summary>
+        /// Initializes a new instance of <see cref="NearQueryCursor{TK, TV}"/>.
+        /// </summary>
+        /// <param name="nearCache">Near cache</param>
+        /// <param name="filter">Filter.</param>
+        /// <param name="partition">Partition.</param>
+        /// <param name="dispose">Dispose action.</param>
+        internal NearQueryCursor(INearCache nearCache, ICacheEntryFilter<TK, TV> filter
= null, 
+            int? partition = null, Action dispose = null)
+        {
+            Debug.Assert(nearCache != null);
+            
+            _nearCache = nearCache;
+            _filter = filter;
+            _partition = partition;
+            _dispose = dispose;
+
+            if (_dispose == null)
+            {
+                GC.SuppressFinalize(this);
+            }
+        }
+
+        /** <inheritdoc /> */
+        public IList<ICacheEntry<TK, TV>> GetAll()
+        {
+            return GetEnumerable().ToList();
+        }
+
+        /** <inheritdoc /> */
+        public IEnumerator<ICacheEntry<TK, TV>> GetEnumerator()
+        {
+            return GetEnumerable().GetEnumerator();
+        }
+
+        /** <inheritdoc /> */
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return GetEnumerator();
+        }
+
+        /** <inheritdoc /> */
+        public void Dispose()
+        {
+            ReleaseUnmanagedResources();
+            GC.SuppressFinalize(this);
+        }
+
+        /// <summary>
+        /// Gets the enumerable.
+        /// </summary>
+        private IEnumerable<ICacheEntry<TK, TV>> GetEnumerable()
+        {
+            if (_disposed)
+            {
+                throw new ObjectDisposedException(GetType().Name, 
+                    "Object has been disposed. Query cursor can not be enumerated multiple
times.");
+            }
+            
+            if (_iterCalled)
+            {
+                throw new InvalidOperationException("Query cursor can not be enumerated multiple
times.");
+            }
+            
+            _iterCalled = true;
+
+            return GetEnumerableInternal();
+        }
+
+        /// <summary>
+        /// Gets the enumerable.
+        /// </summary>
+        private IEnumerable<ICacheEntry<TK, TV>> GetEnumerableInternal()
+        {
+            try
+            {
+                foreach (var entry in _nearCache.GetEntries<TK, TV>(_partition))
+                {
+                    if (_filter == null || _filter.Invoke(entry))
+                    {
+                        yield return entry;
+                    }
+                }
+            }
+            finally
+            {
+                Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Releases unmanaged resources.
+        /// </summary>
+        private void ReleaseUnmanagedResources()
+        {
+            if (!_disposed)
+            {
+                _disposed = true;
+
+                if (_dispose != null)
+                {
+                    _dispose();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Finalizer.
+        /// </summary>
+        ~NearQueryCursor()
+        {
+            ReleaseUnmanagedResources();
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message