ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [12/45] ignite git commit: IGNITE-1348: Moved GridGain's .Net module to Ignite.
Date Sat, 05 Sep 2015 02:31:55 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
new file mode 100644
index 0000000..55bc76c
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAbstractTest.cs
@@ -0,0 +1,1181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using System.Runtime.Serialization;
+    using System.Threading;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Event;
+    using Apache.Ignite.Core.Cache.Query;
+    using Apache.Ignite.Core.Cache.Query.Continuous;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl;
+    using Apache.Ignite.Core.Portable;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+    using CQU = Apache.Ignite.Core.Impl.Cache.Query.Continuous.ContinuousQueryUtils;
+
+    /// <summary>
+    /// Tests for continuous query.
+    /// </summary>
+    [SuppressMessage("ReSharper", "InconsistentNaming")]
+    [SuppressMessage("ReSharper", "PossibleNullReferenceException")]
+    [SuppressMessage("ReSharper", "StaticMemberInGenericType")]
+    public abstract class ContinuousQueryAbstractTest
+    {
+        /** Cache name: ATOMIC, backup. */
+        protected const string CACHE_ATOMIC_BACKUP = "atomic_backup";
+
+        /** Cache name: ATOMIC, no backup. */
+        protected const string CACHE_ATOMIC_NO_BACKUP = "atomic_no_backup";
+
+        /** Cache name: TRANSACTIONAL, backup. */
+        protected const string CACHE_TX_BACKUP = "transactional_backup";
+
+        /** Cache name: TRANSACTIONAL, no backup. */
+        protected const string CACHE_TX_NO_BACKUP = "transactional_no_backup";
+
+        /** Listener events. */
+        public static BlockingCollection<CallbackEvent> CB_EVTS = new BlockingCollection<CallbackEvent>();
+
+        /** Listener events. */
+        public static BlockingCollection<FilterEvent> FILTER_EVTS = new BlockingCollection<FilterEvent>();
+
+        /** First node. */
+        private IIgnite grid1;
+
+        /** Second node. */
+        private IIgnite grid2;
+
+        /** Cache on the first node. */
+        private ICache<int, PortableEntry> cache1;
+
+        /** Cache on the second node. */
+        private ICache<int, PortableEntry> cache2;
+
+        /** Cache name. */
+        private readonly string cacheName;
+        
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="cacheName">Cache name.</param>
+        protected ContinuousQueryAbstractTest(string cacheName)
+        {
+            this.cacheName = cacheName;
+        }
+
+        /// <summary>
+        /// Set-up routine.
+        /// </summary>
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            GC.Collect();
+            TestUtils.JvmDebug = true;
+
+            IgniteConfigurationEx cfg = new IgniteConfigurationEx();
+
+            PortableConfiguration portCfg = new PortableConfiguration();
+
+            ICollection<PortableTypeConfiguration> portTypeCfgs = new List<PortableTypeConfiguration>();
+
+            portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableEntry)));
+            portTypeCfgs.Add(new PortableTypeConfiguration(typeof(PortableFilter)));
+            portTypeCfgs.Add(new PortableTypeConfiguration(typeof(KeepPortableFilter)));
+
+            portCfg.TypeConfigurations = portTypeCfgs;
+
+            cfg.PortableConfiguration = portCfg;
+            cfg.JvmClasspath = TestUtils.CreateTestClasspath();
+            cfg.JvmOptions = TestUtils.TestJavaOptions();
+            cfg.SpringConfigUrl = "config\\cache-query-continuous.xml";
+
+            cfg.GridName = "grid-1";
+            grid1 = Ignition.Start(cfg);
+            cache1 = grid1.Cache<int, PortableEntry>(cacheName);
+
+            cfg.GridName = "grid-2";
+            grid2 = Ignition.Start(cfg);
+            cache2 = grid2.Cache<int, PortableEntry>(cacheName);
+        }
+
+        /// <summary>
+        /// Tear-down routine.
+        /// </summary>
+        [TestFixtureTearDown]
+        public void TearDown()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Before-test routine.
+        /// </summary>
+        [SetUp]
+        public void BeforeTest()
+        {
+            CB_EVTS = new BlockingCollection<CallbackEvent>();
+            FILTER_EVTS = new BlockingCollection<FilterEvent>();
+
+            AbstractFilter<PortableEntry>.res = true;
+            AbstractFilter<PortableEntry>.err = false;
+            AbstractFilter<PortableEntry>.marshErr = false;
+            AbstractFilter<PortableEntry>.unmarshErr = false;
+
+            cache1.Remove(PrimaryKey(cache1));
+            cache1.Remove(PrimaryKey(cache2));
+
+            Assert.AreEqual(0, cache1.Size());
+            Assert.AreEqual(0, cache2.Size());
+
+            Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+        }
+        
+        /// <summary>
+        /// Test arguments validation.
+        /// </summary>
+        [Test]
+        public void TestValidation()
+        {
+            Assert.Throws<ArgumentException>(() => { cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(null)); });
+        }
+
+        /// <summary>
+        /// Test multiple closes.
+        /// </summary>
+        [Test]
+        public void TestMultipleClose()
+        {
+            int key1 = PrimaryKey(cache1);
+            int key2 = PrimaryKey(cache2);
+
+            ContinuousQuery<int, PortableEntry> qry =
+                new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+            IDisposable qryHnd;
+
+            using (qryHnd = cache1.QueryContinuous(qry))
+            {
+                // Put from local node.
+                cache1.GetAndPut(key1, Entry(key1));
+                CheckCallbackSingle(key1, null, Entry(key1));
+
+                // Put from remote node.
+                cache2.GetAndPut(key2, Entry(key2));
+                CheckCallbackSingle(key2, null, Entry(key2));
+            }
+
+            qryHnd.Dispose();
+        }
+
+        /// <summary>
+        /// Test regular callback operations.
+        /// </summary>
+        [Test]
+        public void TestCallback()
+        {
+            CheckCallback(false);
+        }
+
+        /// <summary>
+        /// Check regular callback execution.
+        /// </summary>
+        /// <param name="loc"></param>
+        protected void CheckCallback(bool loc)
+        {
+            int key1 = PrimaryKey(cache1);
+            int key2 = PrimaryKey(cache2);
+            
+            ContinuousQuery<int, PortableEntry> qry = loc ?
+                new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>(), true) :
+                new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+            using (cache1.QueryContinuous(qry))
+            {
+                // Put from local node.
+                cache1.GetAndPut(key1, Entry(key1));
+                CheckCallbackSingle(key1, null, Entry(key1));
+
+                cache1.GetAndPut(key1, Entry(key1 + 1));
+                CheckCallbackSingle(key1, Entry(key1), Entry(key1 + 1));
+
+                cache1.Remove(key1);
+                CheckCallbackSingle(key1, Entry(key1 + 1), null);
+
+                // Put from remote node.
+                cache2.GetAndPut(key2, Entry(key2));
+
+                if (loc)
+                    CheckNoCallback(100);
+                else
+                    CheckCallbackSingle(key2, null, Entry(key2));
+
+                cache1.GetAndPut(key2, Entry(key2 + 1));
+
+                if (loc)
+                    CheckNoCallback(100);
+                else
+                    CheckCallbackSingle(key2, Entry(key2), Entry(key2 + 1));
+
+                cache1.Remove(key2);
+
+                if (loc)
+                    CheckNoCallback(100);
+                else
+                    CheckCallbackSingle(key2, Entry(key2 + 1), null);
+            }
+
+            cache1.Put(key1, Entry(key1));
+            CheckNoCallback(100);
+
+            cache1.Put(key2, Entry(key2));
+            CheckNoCallback(100);
+        } 
+        
+        /// <summary>
+        /// Test Ignite injection into callback.
+        /// </summary>
+        [Test]
+        public void TestCallbackInjection()
+        {
+            Listener<PortableEntry> cb = new Listener<PortableEntry>();
+
+            Assert.IsNull(cb.ignite);
+
+            using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb)))
+            {
+                Assert.IsNotNull(cb.ignite);
+            }
+        }
+        
+        /// <summary>
+        /// Test portable filter logic.
+        /// </summary>
+        [Test]
+        public void TestFilterPortable()
+        {
+            CheckFilter(true, false);
+        }
+
+        /// <summary>
+        /// Test serializable filter logic.
+        /// </summary>
+        [Test]
+        public void TestFilterSerializable()
+        {
+            CheckFilter(false, false);
+        }
+
+        /// <summary>
+        /// Check filter.
+        /// </summary>
+        /// <param name="portable">Portable.</param>
+        /// <param name="loc">Local cache flag.</param>
+        protected void CheckFilter(bool portable, bool loc)
+        {
+            ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+            ICacheEntryEventFilter<int, PortableEntry> filter = 
+                portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+            ContinuousQuery<int, PortableEntry> qry = loc ? 
+                new ContinuousQuery<int, PortableEntry>(lsnr, filter, true) : 
+                new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+            using (cache1.QueryContinuous(qry))
+            {
+                // Put from local node.
+                int key1 = PrimaryKey(cache1);
+                cache1.GetAndPut(key1, Entry(key1));
+                CheckFilterSingle(key1, null, Entry(key1));
+                CheckCallbackSingle(key1, null, Entry(key1));
+
+                // Put from remote node.
+                int key2 = PrimaryKey(cache2);
+                cache1.GetAndPut(key2, Entry(key2));
+
+                if (loc)
+                {
+                    CheckNoFilter(key2);
+                    CheckNoCallback(key2);
+                }
+                else
+                {
+                    CheckFilterSingle(key2, null, Entry(key2));
+                    CheckCallbackSingle(key2, null, Entry(key2));
+                }
+
+                AbstractFilter<PortableEntry>.res = false;
+
+                // Ignored put from local node.
+                cache1.GetAndPut(key1, Entry(key1 + 1));
+                CheckFilterSingle(key1, Entry(key1), Entry(key1 + 1));
+                CheckNoCallback(100);
+
+                // Ignored put from remote node.
+                cache1.GetAndPut(key2, Entry(key2 + 1));
+
+                if (loc)
+                    CheckNoFilter(100);
+                else
+                    CheckFilterSingle(key2, Entry(key2), Entry(key2 + 1));
+
+                CheckNoCallback(100);
+            }
+        }
+
+        /// <summary>
+        /// Test portable filter error during invoke.
+        /// </summary>
+        [Ignore("IGNITE-521")]
+        [Test]
+        public void TestFilterInvokeErrorPortable()
+        {
+            CheckFilterInvokeError(true);
+        }
+
+        /// <summary>
+        /// Test serializable filter error during invoke.
+        /// </summary>
+        [Ignore("IGNITE-521")]
+        [Test]
+        public void TestFilterInvokeErrorSerializable()
+        {
+            CheckFilterInvokeError(false);
+        }
+
+        /// <summary>
+        /// Check filter error handling logic during invoke.
+        /// </summary>
+        private void CheckFilterInvokeError(bool portable)
+        {
+            AbstractFilter<PortableEntry>.err = true;
+
+            ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+            ICacheEntryEventFilter<int, PortableEntry> filter =
+                portable ? (AbstractFilter<PortableEntry>) new PortableFilter() : new SerializableFilter();
+
+            ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+            using (cache1.QueryContinuous(qry))
+            {
+                // Put from local node.
+                try
+                {
+                    cache1.GetAndPut(PrimaryKey(cache1), Entry(1));
+
+                    Assert.Fail("Should not reach this place.");
+                }
+                catch (IgniteException)
+                {
+                    // No-op.
+                }
+                catch (Exception)
+                {
+                    Assert.Fail("Unexpected error.");
+                }
+
+                // Put from remote node.
+                try
+                {
+                    cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+                    Assert.Fail("Should not reach this place.");
+                }
+                catch (IgniteException)
+                {
+                    // No-op.
+                }
+                catch (Exception)
+                {
+                    Assert.Fail("Unexpected error.");
+                }
+            }
+        }
+
+        /// <summary>
+        /// Test portable filter marshalling error.
+        /// </summary>
+        [Test]
+        public void TestFilterMarshalErrorPortable()
+        {
+            CheckFilterMarshalError(true);
+        }
+
+        /// <summary>
+        /// Test serializable filter marshalling error.
+        /// </summary>
+        [Test]
+        public void TestFilterMarshalErrorSerializable()
+        {
+            CheckFilterMarshalError(false);
+        }
+
+        /// <summary>
+        /// Check filter marshal error handling.
+        /// </summary>
+        /// <param name="portable">Portable flag.</param>
+        private void CheckFilterMarshalError(bool portable)
+        {
+            AbstractFilter<PortableEntry>.marshErr = true;
+
+            ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+            ICacheEntryEventFilter<int, PortableEntry> filter =
+                portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+            ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+            Assert.Throws<Exception>(() =>
+            {
+                using (cache1.QueryContinuous(qry))
+                {
+                    // No-op.
+                }
+            });
+        }
+
+        /// <summary>
+        /// Test non-serializable filter error.
+        /// </summary>
+        [Test]
+        public void TestFilterNonSerializable()
+        {
+            CheckFilterNonSerializable(false);
+        }
+
+        /// <summary>
+        /// Test non-serializable filter behavior.
+        /// </summary>
+        /// <param name="loc"></param>
+        protected void CheckFilterNonSerializable(bool loc)
+        {
+            AbstractFilter<PortableEntry>.unmarshErr = true;
+
+            ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+            ICacheEntryEventFilter<int, PortableEntry> filter = new LocalFilter();
+
+            ContinuousQuery<int, PortableEntry> qry = loc
+                ? new ContinuousQuery<int, PortableEntry>(lsnr, filter, true)
+                : new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+            if (loc)
+            {
+                using (cache1.QueryContinuous(qry))
+                {
+                    // Local put must be fine.
+                    int key1 = PrimaryKey(cache1);
+                    cache1.GetAndPut(key1, Entry(key1));
+                    CheckFilterSingle(key1, null, Entry(key1));
+                }
+            }
+            else
+            {
+                Assert.Throws<SerializationException>(() =>
+                {
+                    using (cache1.QueryContinuous(qry))
+                    {
+                        // No-op.
+                    }
+                });
+            }
+        }
+
+        /// <summary>
+        /// Test portable filter unmarshalling error.
+        /// </summary>
+        [Ignore("IGNITE-521")]
+        [Test]
+        public void TestFilterUnmarshalErrorPortable()
+        {
+            CheckFilterUnmarshalError(true);
+        }
+        
+        /// <summary>
+        /// Test serializable filter unmarshalling error.
+        /// </summary>
+        [Ignore("IGNITE-521")]
+        [Test]
+        public void TestFilterUnmarshalErrorSerializable()
+        {
+            CheckFilterUnmarshalError(false);
+        }
+
+        /// <summary>
+        /// Check filter unmarshal error handling.
+        /// </summary>
+        /// <param name="portable">Portable flag.</param>
+        private void CheckFilterUnmarshalError(bool portable)
+        {
+            AbstractFilter<PortableEntry>.unmarshErr = true;
+
+            ICacheEntryEventListener<int, PortableEntry> lsnr = new Listener<PortableEntry>();
+            ICacheEntryEventFilter<int, PortableEntry> filter =
+                portable ? (AbstractFilter<PortableEntry>)new PortableFilter() : new SerializableFilter();
+
+            ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(lsnr, filter);
+
+            using (cache1.QueryContinuous(qry))
+            {
+                // Local put must be fine.
+                int key1 = PrimaryKey(cache1);
+                cache1.GetAndPut(key1, Entry(key1));
+                CheckFilterSingle(key1, null, Entry(key1));
+                
+                // Remote put must fail.
+                try
+                {
+                    cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+                    Assert.Fail("Should not reach this place.");
+                }
+                catch (IgniteException)
+                {
+                    // No-op.
+                }
+                catch (Exception)
+                {
+                    Assert.Fail("Unexpected error.");
+                }
+            }
+        }
+
+        /// <summary>
+        /// Test Ignite injection into filters.
+        /// </summary>
+        [Test]
+        public void TestFilterInjection()
+        {
+            Listener<PortableEntry> cb = new Listener<PortableEntry>();
+            PortableFilter filter = new PortableFilter();
+
+            Assert.IsNull(filter.ignite);
+
+            using (cache1.QueryContinuous(new ContinuousQuery<int, PortableEntry>(cb, filter)))
+            {
+                // Local injection.
+                Assert.IsNotNull(filter.ignite);
+
+                // Remote injection.
+                cache1.GetAndPut(PrimaryKey(cache2), Entry(1));
+
+                FilterEvent evt;
+
+                Assert.IsTrue(FILTER_EVTS.TryTake(out evt, 500));
+
+                Assert.IsNotNull(evt.ignite);
+            }
+        }
+
+
+        /// <summary>
+        /// Test "keep-portable" scenario.
+        /// </summary>
+        [Test]
+        public void TestKeepPortable()
+        {
+            var cache = cache1.WithKeepPortable<int, IPortableObject>();
+
+            ContinuousQuery<int, IPortableObject> qry = new ContinuousQuery<int, IPortableObject>(
+                    new Listener<IPortableObject>(), new KeepPortableFilter());
+
+            using (cache.QueryContinuous(qry))
+            {
+                // 1. Local put.
+                cache1.GetAndPut(PrimaryKey(cache1), Entry(1));
+
+                CallbackEvent cbEvt;
+                FilterEvent filterEvt;
+
+                Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
+                Assert.AreEqual(PrimaryKey(cache1), filterEvt.entry.Key);
+                Assert.AreEqual(null, filterEvt.entry.OldValue);
+                Assert.AreEqual(Entry(1), (filterEvt.entry.Value as IPortableObject)
+                    .Deserialize<PortableEntry>());
+
+                Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
+                Assert.AreEqual(1, cbEvt.entries.Count);
+                Assert.AreEqual(PrimaryKey(cache1), cbEvt.entries.First().Key);
+                Assert.AreEqual(null, cbEvt.entries.First().OldValue);
+                Assert.AreEqual(Entry(1), (cbEvt.entries.First().Value as IPortableObject)
+                    .Deserialize<PortableEntry>());
+
+                // 2. Remote put.
+                cache1.GetAndPut(PrimaryKey(cache2), Entry(2));
+
+                Assert.IsTrue(FILTER_EVTS.TryTake(out filterEvt, 500));
+                Assert.AreEqual(PrimaryKey(cache2), filterEvt.entry.Key);
+                Assert.AreEqual(null, filterEvt.entry.OldValue);
+                Assert.AreEqual(Entry(2), (filterEvt.entry.Value as IPortableObject)
+                    .Deserialize<PortableEntry>());
+
+                Assert.IsTrue(CB_EVTS.TryTake(out cbEvt, 500));
+                Assert.AreEqual(1, cbEvt.entries.Count);
+                Assert.AreEqual(PrimaryKey(cache2), cbEvt.entries.First().Key);
+                Assert.AreEqual(null, cbEvt.entries.First().OldValue);
+                Assert.AreEqual(Entry(2),
+                    (cbEvt.entries.First().Value as IPortableObject).Deserialize<PortableEntry>());
+            }
+        }
+
+        /// <summary>
+        /// Test whether buffer size works fine.
+        /// </summary>
+        [Test]
+        public void TestBufferSize()
+        {
+            // Put two remote keys in advance.
+            List<int> rmtKeys = PrimaryKeys(cache2, 2);
+
+            ContinuousQuery<int, PortableEntry> qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+            qry.BufferSize = 2;
+            qry.TimeInterval = TimeSpan.FromMilliseconds(1000000);
+
+            using (cache1.QueryContinuous(qry))
+            {
+                qry.BufferSize = 2;
+
+                cache1.GetAndPut(rmtKeys[0], Entry(rmtKeys[0]));
+
+                CheckNoCallback(100);
+                
+                cache1.GetAndPut(rmtKeys[1], Entry(rmtKeys[1]));
+                
+                CallbackEvent evt;
+
+                Assert.IsTrue(CB_EVTS.TryTake(out evt, 1000));
+
+                Assert.AreEqual(2, evt.entries.Count);
+
+                var entryRmt0 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[0]); });
+                var entryRmt1 = evt.entries.Single(entry => { return entry.Key.Equals(rmtKeys[1]); });
+
+                Assert.AreEqual(rmtKeys[0], entryRmt0.Key);
+                Assert.IsNull(entryRmt0.OldValue);
+                Assert.AreEqual(Entry(rmtKeys[0]), entryRmt0.Value);
+
+                Assert.AreEqual(rmtKeys[1], entryRmt1.Key);
+                Assert.IsNull(entryRmt1.OldValue);
+                Assert.AreEqual(Entry(rmtKeys[1]), entryRmt1.Value);
+            }
+
+            cache1.Remove(rmtKeys[0]);
+            cache1.Remove(rmtKeys[1]);
+        }
+
+        /// <summary>
+        /// Test whether timeout works fine.
+        /// </summary>
+        [Test]
+        public void TestTimeout()
+        {
+            int key1 = PrimaryKey(cache1);
+            int key2 = PrimaryKey(cache2);
+
+            ContinuousQuery<int, PortableEntry> qry =
+                new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+            qry.BufferSize = 2;
+            qry.TimeInterval = TimeSpan.FromMilliseconds(500);
+
+            using (cache1.QueryContinuous(qry))
+            {
+                // Put from local node.
+                cache1.GetAndPut(key1, Entry(key1));
+                CheckCallbackSingle(key1, null, Entry(key1));
+
+                // Put from remote node.
+                cache1.GetAndPut(key2, Entry(key2));
+                CheckNoCallback(100);
+                CheckCallbackSingle(key2, null, Entry(key2), 1000);
+            }
+        }
+
+        /// <summary>
+        /// Test whether nested Ignite API call from callback works fine.
+        /// </summary>
+        [Test]
+        public void TestNestedCallFromCallback()
+        {
+            var cache = cache1.WithKeepPortable<int, IPortableObject>();
+
+            int key = PrimaryKey(cache1);
+
+            NestedCallListener cb = new NestedCallListener();
+
+            using (cache.QueryContinuous(new ContinuousQuery<int, IPortableObject>(cb)))
+            {
+                cache1.GetAndPut(key, Entry(key));
+
+                cb.countDown.Wait();
+            }
+
+            cache.Remove(key);
+        }
+
+        /// <summary>
+        /// Tests the initial query.
+        /// </summary>
+        [Test]
+        public void TestInitialQuery()
+        {
+            // Scan query, GetAll
+            TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.GetAll());
+
+            // Scan query, iterator
+            TestInitialQuery(new ScanQuery<int, PortableEntry>(new InitialQueryScanFilter()), cur => cur.ToList());
+
+            // Sql query, GetAll
+            TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.GetAll());
+            
+            // Sql query, iterator
+            TestInitialQuery(new SqlQuery(typeof(PortableEntry), "val < 33"), cur => cur.ToList());
+
+            // Text query, GetAll
+            TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.GetAll());
+            
+            // Text query, iterator
+            TestInitialQuery(new TextQuery(typeof(PortableEntry), "1*"), cur => cur.ToList());
+
+            // Test exception: invalid initial query
+            var ex = Assert.Throws<IgniteException>(
+                () => TestInitialQuery(new TextQuery(typeof (PortableEntry), "*"), cur => cur.GetAll()));
+
+            Assert.AreEqual("Cannot parse '*': '*' or '?' not allowed as first character in WildcardQuery", ex.Message);
+        }
+
+        /// <summary>
+        /// Tests the initial query.
+        /// </summary>
+        private void TestInitialQuery(QueryBase initialQry, Func<IQueryCursor<ICacheEntry<int, PortableEntry>>, 
+            IEnumerable<ICacheEntry<int, PortableEntry>>> getAllFunc)
+        {
+            var qry = new ContinuousQuery<int, PortableEntry>(new Listener<PortableEntry>());
+
+            cache1.Put(11, Entry(11));
+            cache1.Put(12, Entry(12));
+            cache1.Put(33, Entry(33));
+
+            try
+            {
+                IContinuousQueryHandle<ICacheEntry<int, PortableEntry>> contQry;
+                
+                using (contQry = cache1.QueryContinuous(qry, initialQry))
+                {
+                    // Check initial query
+                    var initialEntries =
+                        getAllFunc(contQry.GetInitialQueryCursor()).Distinct().OrderBy(x => x.Key).ToList();
+
+                    Assert.Throws<InvalidOperationException>(() => contQry.GetInitialQueryCursor());
+
+                    Assert.AreEqual(2, initialEntries.Count);
+
+                    for (int i = 0; i < initialEntries.Count; i++)
+                    {
+                        Assert.AreEqual(i + 11, initialEntries[i].Key);
+                        Assert.AreEqual(i + 11, initialEntries[i].Value.val);
+                    }
+
+                    // Check continuous query
+                    cache1.Put(44, Entry(44));
+                    CheckCallbackSingle(44, null, Entry(44));
+                }
+
+                Assert.Throws<ObjectDisposedException>(() => contQry.GetInitialQueryCursor());
+
+                contQry.Dispose();  // multiple dispose calls are ok
+            }
+            finally
+            {
+                cache1.Clear();
+            }
+        }
+
+        /// <summary>
+        /// Check single filter event.
+        /// </summary>
+        /// <param name="expKey">Expected key.</param>
+        /// <param name="expOldVal">Expected old value.</param>
+        /// <param name="expVal">Expected value.</param>
+        private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal)
+        {
+            CheckFilterSingle(expKey, expOldVal, expVal, 1000);
+        }
+
+        /// <summary>
+        /// Check single filter event.
+        /// </summary>
+        /// <param name="expKey">Expected key.</param>
+        /// <param name="expOldVal">Expected old value.</param>
+        /// <param name="expVal">Expected value.</param>
+        /// <param name="timeout">Timeout.</param>
+        private void CheckFilterSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout)
+        {
+            FilterEvent evt;
+
+            Assert.IsTrue(FILTER_EVTS.TryTake(out evt, timeout));
+
+            Assert.AreEqual(expKey, evt.entry.Key);
+            Assert.AreEqual(expOldVal, evt.entry.OldValue);
+            Assert.AreEqual(expVal, evt.entry.Value);
+        }
+
+        /// <summary>
+        /// Ensure that no filter events are logged.
+        /// </summary>
+        /// <param name="timeout">Timeout.</param>
+        private void CheckNoFilter(int timeout)
+        {
+            FilterEvent evt;
+
+            Assert.IsFalse(FILTER_EVTS.TryTake(out evt, timeout));
+        }
+
+        /// <summary>
+        /// Check single callback event.
+        /// </summary>
+        /// <param name="expKey">Expected key.</param>
+        /// <param name="expOldVal">Expected old value.</param>
+        /// <param name="expVal">Expected new value.</param>
+        private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal)
+        {
+            CheckCallbackSingle(expKey, expOldVal, expVal, 1000);
+        }
+
+        /// <summary>
+        /// Check single callback event.
+        /// </summary>
+        /// <param name="expKey">Expected key.</param>
+        /// <param name="expOldVal">Expected old value.</param>
+        /// <param name="expVal">Expected new value.</param>
+        /// <param name="timeout">Timeout.</param>
+        private void CheckCallbackSingle(int expKey, PortableEntry expOldVal, PortableEntry expVal, int timeout)
+        {
+            CallbackEvent evt;
+
+            Assert.IsTrue(CB_EVTS.TryTake(out evt, timeout));
+
+            Assert.AreEqual(1, evt.entries.Count);
+
+            Assert.AreEqual(expKey, evt.entries.First().Key);
+            Assert.AreEqual(expOldVal, evt.entries.First().OldValue);
+            Assert.AreEqual(expVal, evt.entries.First().Value);
+        }
+
+        /// <summary>
+        /// Ensure that no callback events are logged.
+        /// </summary>
+        /// <param name="timeout">Timeout.</param>
+        private void CheckNoCallback(int timeout)
+        {
+            CallbackEvent evt;
+
+            Assert.IsFalse(CB_EVTS.TryTake(out evt, timeout));
+        }
+
+        /// <summary>
+        /// Craate entry.
+        /// </summary>
+        /// <param name="val">Value.</param>
+        /// <returns>Entry.</returns>
+        private static PortableEntry Entry(int val)
+        {
+            return new PortableEntry(val);
+        }
+
+        /// <summary>
+        /// Get primary key for cache.
+        /// </summary>
+        /// <param name="cache">Cache.</param>
+        /// <returns>Primary key.</returns>
+        private static int PrimaryKey<T>(ICache<int, T> cache)
+        {
+            return PrimaryKeys(cache, 1)[0];
+        }
+
+        /// <summary>
+        /// Get primary keys for cache.
+        /// </summary>
+        /// <param name="cache">Cache.</param>
+        /// <param name="cnt">Amount of keys.</param>
+        /// <param name="startFrom">Value to start from.</param>
+        /// <returns></returns>
+        private static List<int> PrimaryKeys<T>(ICache<int, T> cache, int cnt, int startFrom = 0)
+        {
+            IClusterNode node = cache.Ignite.Cluster.LocalNode;
+
+            ICacheAffinity aff = cache.Ignite.Affinity(cache.Name);
+
+            List<int> keys = new List<int>(cnt);
+
+            for (int i = startFrom; i < startFrom + 100000; i++)
+            {
+                if (aff.IsPrimary(node, i))
+                {
+                    keys.Add(i);
+
+                    if (keys.Count == cnt)
+                        return keys;
+                }
+            }
+
+            Assert.Fail("Failed to find " + cnt + " primary keys.");
+
+            return null;
+        }
+
+        /// <summary>
+        /// Portable entry.
+        /// </summary>
+        public class PortableEntry
+        {
+            /** Value. */
+            public readonly int val;
+
+            /** <inheritDot /> */
+            public override int GetHashCode()
+            {
+                return val;
+            }
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="val">Value.</param>
+            public PortableEntry(int val)
+            {
+                this.val = val;
+            }
+
+            /** <inheritDoc /> */
+            public override bool Equals(object obj)
+            {
+                return obj != null && obj is PortableEntry && ((PortableEntry)obj).val == val;
+            }
+        }
+
+        /// <summary>
+        /// Abstract filter.
+        /// </summary>
+        [Serializable]
+        public abstract class AbstractFilter<V> : ICacheEntryEventFilter<int, V>
+        {
+            /** Result. */
+            public static volatile bool res = true;
+
+            /** Throw error on invocation. */
+            public static volatile bool err;
+
+            /** Throw error during marshalling. */
+            public static volatile bool marshErr;
+
+            /** Throw error during unmarshalling. */
+            public static volatile bool unmarshErr;
+
+            /** Grid. */
+            [InstanceResource]
+            public IIgnite ignite;
+
+            /** <inheritDoc /> */
+            public bool Evaluate(ICacheEntryEvent<int, V> evt)
+            {
+                if (err)
+                    throw new Exception("Filter error.");
+
+                FILTER_EVTS.Add(new FilterEvent(ignite,
+                    CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value)));
+
+                return res;
+            }
+        }
+
+        /// <summary>
+        /// Filter which cannot be serialized.
+        /// </summary>
+        public class LocalFilter : AbstractFilter<PortableEntry>
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Portable filter.
+        /// </summary>
+        public class PortableFilter : AbstractFilter<PortableEntry>, IPortableMarshalAware
+        {
+            /** <inheritDoc /> */
+            public void WritePortable(IPortableWriter writer)
+            {
+                if (marshErr)
+                    throw new Exception("Filter marshalling error.");
+            }
+
+            /** <inheritDoc /> */
+            public void ReadPortable(IPortableReader reader)
+            {
+                if (unmarshErr)
+                    throw new Exception("Filter unmarshalling error.");
+            }
+        }
+
+        /// <summary>
+        /// Serializable filter.
+        /// </summary>
+        [Serializable]
+        public class SerializableFilter : AbstractFilter<PortableEntry>, ISerializable
+        {
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            public SerializableFilter()
+            {
+                // No-op.
+            }
+
+            /// <summary>
+            /// Serialization constructor.
+            /// </summary>
+            /// <param name="info">Info.</param>
+            /// <param name="context">Context.</param>
+            protected SerializableFilter(SerializationInfo info, StreamingContext context)
+            {
+                if (unmarshErr)
+                    throw new Exception("Filter unmarshalling error.");
+            }
+
+            /** <inheritDoc /> */
+            public void GetObjectData(SerializationInfo info, StreamingContext context)
+            {
+                if (marshErr)
+                    throw new Exception("Filter marshalling error.");
+            }
+        }
+
+        /// <summary>
+        /// Filter for "keep-portable" scenario.
+        /// </summary>
+        public class KeepPortableFilter : AbstractFilter<IPortableObject>
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Listener.
+        /// </summary>
+        public class Listener<V> : ICacheEntryEventListener<int, V>
+        {
+            [InstanceResource]
+            public IIgnite ignite;
+            
+            /** <inheritDoc /> */
+            public void OnEvent(IEnumerable<ICacheEntryEvent<int, V>> evts)
+            {
+                ICollection<ICacheEntryEvent<object, object>> entries0 =
+                    new List<ICacheEntryEvent<object, object>>();
+
+                foreach (ICacheEntryEvent<int, V> evt in evts)
+                    entries0.Add(CQU.CreateEvent<object, object>(evt.Key, evt.OldValue, evt.Value));
+
+                CB_EVTS.Add(new CallbackEvent(entries0));
+            }
+        }
+
+        /// <summary>
+        /// Listener with nested Ignite API call.
+        /// </summary>
+        public class NestedCallListener : ICacheEntryEventListener<int, IPortableObject>
+        {
+            /** Event. */
+            public readonly CountdownEvent countDown = new CountdownEvent(1);
+
+            public void OnEvent(IEnumerable<ICacheEntryEvent<int, IPortableObject>> evts)
+            {
+                foreach (ICacheEntryEvent<int, IPortableObject> evt in evts)
+                {
+                    IPortableObject val = evt.Value;
+
+                    IPortableMetadata meta = val.Metadata();
+
+                    Assert.AreEqual(typeof(PortableEntry).Name, meta.TypeName);
+                }
+
+                countDown.Signal();
+            }
+        }
+
+        /// <summary>
+        /// Filter event.
+        /// </summary>
+        public class FilterEvent
+        {
+            /** Grid. */
+            public IIgnite ignite;
+
+            /** Entry. */
+            public ICacheEntryEvent<object, object> entry;
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="ignite">Grid.</param>
+            /// <param name="entry">Entry.</param>
+            public FilterEvent(IIgnite ignite, ICacheEntryEvent<object, object> entry)
+            {
+                this.ignite = ignite;
+                this.entry = entry;
+            }
+        }
+
+        /// <summary>
+        /// Callbakc event.
+        /// </summary>
+        public class CallbackEvent
+        {
+            /** Entries. */
+            public ICollection<ICacheEntryEvent<object, object>> entries;
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="entries">Entries.</param>
+            public CallbackEvent(ICollection<ICacheEntryEvent<object, object>> entries)
+            {
+                this.entries = entries;
+            }
+        }
+
+        /// <summary>
+        /// ScanQuery filter for InitialQuery test.
+        /// </summary>
+        [Serializable]
+        private class InitialQueryScanFilter : ICacheEntryFilter<int, PortableEntry>
+        {
+            /** <inheritdoc /> */
+            public bool Invoke(ICacheEntry<int, PortableEntry> entry)
+            {
+                return entry.Key < 33;
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
new file mode 100644
index 0000000..ac44f10
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicBackupTest.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    /// <summary>
+    /// Continuous query tests for ATOMIC cache with backups.
+    /// </summary>
+    public class ContinuousQueryAtomiclBackupTest : ContinuousQueryAbstractTest
+    {
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        public ContinuousQueryAtomiclBackupTest() : base(CACHE_ATOMIC_BACKUP)
+        {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
new file mode 100644
index 0000000..8e1a18f
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryAtomicNoBackupTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    /// <summary>
+    /// Continuous query tests for ATOMIC cache with no backups.
+    /// </summary>
+    public class ContinuousQueryAtomiclNoBackupTest : ContinuousQueryNoBackupAbstractTest
+    {
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        public ContinuousQueryAtomiclNoBackupTest()
+            : base(CACHE_ATOMIC_NO_BACKUP)
+        {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
new file mode 100644
index 0000000..aa7d627
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryNoBackupAbstractTest.cs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for ocntinuous query when there are no backups.
+    /// </summary>
+    public abstract class ContinuousQueryNoBackupAbstractTest : ContinuousQueryAbstractTest
+    {
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="cacheName">Cache name.</param>
+        protected ContinuousQueryNoBackupAbstractTest(string cacheName) : base(cacheName)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Test regular callback operations for local query.
+        /// </summary>
+        [Test]
+        public void TestCallbackLocal()
+        {
+            CheckCallback(true);
+        }
+
+        /// <summary>
+        /// Test portable filter logic.
+        /// </summary>
+        [Test]
+        public void TestFilterPortableLocal()
+        {
+            CheckFilter(true, true);
+        }
+
+        /// <summary>
+        /// Test serializable filter logic.
+        /// </summary>
+        [Test]
+        public void TestFilterSerializableLocal()
+        {
+            CheckFilter(false, true);
+        }
+
+        /// <summary>
+        /// Test non-serializable filter for local query.
+        /// </summary>
+        [Test]
+        public void TestFilterNonSerializableLocal()
+        {
+            CheckFilterNonSerializable(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
new file mode 100644
index 0000000..08ae88c
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalBackupTest.cs
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    /// <summary>
+    /// Continuous query tests for TRANSACTIONAL cache with backups.
+    /// </summary>
+    public class ContinuousQueryTransactionalBackupTest : ContinuousQueryAbstractTest
+    {
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        public ContinuousQueryTransactionalBackupTest()
+            : base(CACHE_TX_BACKUP)
+        {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
new file mode 100644
index 0000000..685f7b4
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Query/Continuous/ContinuousQueryTransactionalNoBackupTest.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Query.Continuous
+{
+    /// <summary>
+    /// Continuous query tests for TRANSACTIONAL cache with no backups.
+    /// </summary>
+    public class ContinuousQueryTransactionalNoBackupTest : ContinuousQueryNoBackupAbstractTest
+    {
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        public ContinuousQueryTransactionalNoBackupTest() : base(CACHE_TX_NO_BACKUP)
+        {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
new file mode 100644
index 0000000..33eec7b
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheParallelLoadStoreTest.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using System;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Portable;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for GridCacheParallelLoadStoreAdapter.
+    /// </summary>
+    public class CacheParallelLoadStoreTest
+    {
+        // object store name
+        private const string ObjectStoreCacheName = "object_store_parallel";
+
+        /// <summary>
+        /// Set up test class.
+        /// </summary>
+        [TestFixtureSetUp]
+        public virtual void BeforeTests()
+        {
+            TestUtils.KillProcesses();
+            TestUtils.JvmDebug = true;
+
+            Ignition.Start(new IgniteConfiguration
+            {
+                JvmClasspath = TestUtils.CreateTestClasspath(),
+                JvmOptions = TestUtils.TestJavaOptions(),
+                SpringConfigUrl = "config\\native-client-test-cache-parallel-store.xml",
+                PortableConfiguration = new PortableConfiguration
+                {
+                    Types = new[] {typeof (CacheTestParallelLoadStore.Record).FullName}
+                }
+            });
+        }
+
+        /// <summary>
+        /// Tear down test class.
+        /// </summary>
+        [TestFixtureTearDown]
+        public virtual void AfterTests()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Test setup.
+        /// </summary>
+        [SetUp]
+        public void BeforeTest()
+        {
+            Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+        }
+
+        /// <summary>
+        /// Tests the LoadCache.
+        /// </summary>
+        [Test]
+        public void TestLoadCache()
+        {
+            var cache = GetCache();
+
+            Assert.AreEqual(0, cache.Size());
+
+            const int minId = 113;
+            const int expectedItemCount = CacheTestParallelLoadStore.InputDataLength - minId;
+
+            CacheTestParallelLoadStore.ResetCounters();
+
+            cache.LocalLoadCache(null, minId);
+
+            Assert.AreEqual(expectedItemCount, cache.Size());
+
+            // check items presence; increment by 100 to speed up the test
+            for (var i = minId; i < expectedItemCount; i += 100)
+            {
+                var rec = cache.Get(i);
+                Assert.AreEqual(i, rec.Id);
+            }
+
+            // check that items were processed in parallel
+            Assert.GreaterOrEqual(CacheTestParallelLoadStore.UniqueThreadCount, Environment.ProcessorCount);
+        }
+
+        /// <summary>
+        /// Gets the cache.
+        /// </summary>
+        private static ICache<int, CacheTestParallelLoadStore.Record> GetCache()
+        {
+            return Ignition.GetIgnite().Cache<int, CacheTestParallelLoadStore.Record>(ObjectStoreCacheName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
new file mode 100644
index 0000000..bc55901
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Impl;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for store session.
+    /// </summary>
+    public class CacheStoreSessionTest
+    {
+        /** Grid name. */
+        private const string IgniteName = "grid";
+
+        /** Cache 1 name. */
+        private const string Cache1 = "cache1";
+
+        /** Cache 2 name. */
+        private const string Cache2 = "cache2";
+
+        /** Operations. */
+        private static ConcurrentBag<ICollection<Operation>> _dumps;
+
+        /// <summary>
+        /// Set up routine.
+        /// </summary>
+        [TestFixtureSetUp]
+        public virtual void BeforeTests()
+        {
+            //TestUtils.JVM_DEBUG = true;
+
+            TestUtils.KillProcesses();
+
+            TestUtils.JvmDebug = true;
+
+            IgniteConfigurationEx cfg = new IgniteConfigurationEx
+            {
+                GridName = IgniteName,
+                JvmClasspath = TestUtils.CreateTestClasspath(),
+                JvmOptions = TestUtils.TestJavaOptions(),
+                SpringConfigUrl = @"config\cache\store\cache-store-session.xml"
+            };
+
+
+            Ignition.Start(cfg);
+        }
+
+        /// <summary>
+        /// Tear down routine.
+        /// </summary>
+        [TestFixtureTearDown]
+        public virtual void AfterTests()
+        {
+            Ignition.StopAll(true);
+        }
+        
+        /// <summary>
+        /// Test basic session API.
+        /// </summary>
+        [Test]
+        public void TestSession()
+        {
+            _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+            var ignite = Ignition.GetIgnite(IgniteName);
+
+            var cache1 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache1);
+            var cache2 = Ignition.GetIgnite(IgniteName).Cache<int, int>(Cache2);
+
+            // 1. Test rollback.
+            using (var tx = ignite.Transactions.TxStart())
+            {
+                cache1.Put(1, 1);
+                cache2.Put(2, 2);
+
+                tx.Rollback();
+            }
+
+            Assert.AreEqual(1, _dumps.Count);
+            var ops = _dumps.First();
+            Assert.AreEqual(1, ops.Count);
+
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && !op.Commit));
+
+            _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+            // 2. Test puts.
+            using (var tx = ignite.Transactions.TxStart())
+            {
+                cache1.Put(1, 1);
+                cache2.Put(2, 2);
+
+                tx.Commit();
+            }
+
+            Assert.AreEqual(1, _dumps.Count);
+            ops = _dumps.First();
+            Assert.AreEqual(3, ops.Count);
+
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache1.Equals(op.CacheName) && 1.Equals(op.Key) && 1.Equals(op.Value)));
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Write && Cache2.Equals(op.CacheName) && 2.Equals(op.Key) && 2.Equals(op.Value)));
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+
+            _dumps = new ConcurrentBag<ICollection<Operation>>();
+
+            // 3. Test removes.
+            using (var tx = ignite.Transactions.TxStart())
+            {
+                cache1.Remove(1);
+                cache2.Remove(2);
+
+                tx.Commit();
+            }
+
+            Assert.AreEqual(1, _dumps.Count);
+            ops = _dumps.First();
+            Assert.AreEqual(3, ops.Count);
+
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache1.Equals(op.CacheName) && 1.Equals(op.Key)));
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.Delete && Cache2.Equals(op.CacheName) && 2.Equals(op.Key)));
+            Assert.AreEqual(1, ops.Count(op => op.Type == OperationType.SesEnd && op.Commit));
+        }
+
+        /// <summary>
+        /// Dump operations.
+        /// </summary>
+        /// <param name="dump">Dump.</param>
+        internal static void DumpOperations(ICollection<Operation> dump)
+        {
+            _dumps.Add(dump);
+        }
+
+        /// <summary>
+        /// Test store implementation.
+        /// </summary>
+        public class Store : CacheStoreAdapter
+        {
+            /** Store session. */
+            [StoreSessionResource]
+#pragma warning disable 649
+            private ICacheStoreSession _ses;
+#pragma warning restore 649
+
+            /** <inheritdoc /> */
+            public override object Load(object key)
+            {
+                throw new NotImplementedException();
+            }
+
+            /** <inheritdoc /> */
+            public override void Write(object key, object val)
+            {
+                GetOperations().Add(new Operation(_ses.CacheName, OperationType.Write, (int)key, (int)val));
+            }
+
+            /** <inheritdoc /> */
+            public override void Delete(object key)
+            {
+                GetOperations().Add(new Operation(_ses.CacheName, OperationType.Delete, (int)key, 0));
+            }
+
+            /** <inheritdoc /> */
+            public override void SessionEnd(bool commit)
+            {
+                Operation op = new Operation(_ses.CacheName, OperationType.SesEnd) { Commit = commit };
+
+                ICollection<Operation> ops = GetOperations();
+
+                ops.Add(op);
+
+                DumpOperations(ops);
+            }
+
+            /// <summary>
+            /// Get collection with operations.
+            /// </summary>
+            /// <returns>Operations.</returns>
+            private ICollection<Operation> GetOperations()
+            {
+                object ops;
+
+                if (!_ses.Properties.TryGetValue("ops", out ops))
+                {
+                    ops = new List<Operation>();
+
+                    _ses.Properties["ops"] = ops;
+                }
+
+                return (ICollection<Operation>) ops;
+            } 
+        }
+
+        /// <summary>
+        /// Logged operation.
+        /// </summary>
+        internal class Operation
+        {
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="cacheName">Cache name.</param>
+            /// <param name="type">Operation type.</param>
+            public Operation(string cacheName, OperationType type)
+            {
+                CacheName = cacheName;
+                Type = type;
+            }
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="cacheName">Cache name.</param>
+            /// <param name="type">Operation type.</param>
+            /// <param name="key">Key.</param>
+            /// <param name="val">Value.</param>
+            public Operation(string cacheName, OperationType type, int key, int val) : this(cacheName, type)
+            {
+                Key = key;
+                Value = val;
+            }
+
+            /// <summary>
+            /// Cache name.
+            /// </summary>
+            public string CacheName { get; set; }
+            
+            /// <summary>
+            /// Operation type.
+            /// </summary>
+            public OperationType Type { get; set; }
+
+            /// <summary>
+            /// Key.
+            /// </summary>
+            public int Key { get; set; }
+
+            /// <summary>
+            /// Value.
+            /// </summary>
+            public int Value { get; set; }
+
+            /// <summary>
+            /// Commit flag.
+            /// </summary>
+            public bool Commit { get; set; }
+        }
+
+        /// <summary>
+        /// Operation types.
+        /// </summary>
+        internal enum OperationType
+        {
+            /** Write. */
+            Write,
+
+            /** Delete. */
+            Delete,
+
+            /** Session end. */
+            SesEnd
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
new file mode 100644
index 0000000..4e5e050
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Impl;
+    using Apache.Ignite.Core.Portable;
+    using NUnit.Framework;
+
+    /// <summary>
+    ///
+    /// </summary>
+    class Key
+    {
+        private readonly int _idx;
+
+        public Key(int idx)
+        {
+            _idx = idx;
+        }
+
+        public int Index()
+        {
+            return _idx;
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (obj == null || obj.GetType() != GetType())
+                return false;
+
+            Key key = (Key)obj;
+
+            return key._idx == _idx;
+        }
+
+        public override int GetHashCode()
+        {
+            return _idx;
+        }
+    }
+
+    /// <summary>
+    ///
+    /// </summary>
+    class Value
+    {
+        private int _idx;
+
+        public Value(int idx)
+        {
+            _idx = idx;
+        }
+
+        public int Index()
+        {
+            return _idx;
+        }
+    }
+
+    /// <summary>
+    /// Cache entry predicate.
+    /// </summary>
+    [Serializable]
+    public class CacheEntryFilter : ICacheEntryFilter<int, string>
+    {
+        /** <inheritdoc /> */
+        public bool Invoke(ICacheEntry<int, string> entry)
+        {
+            return entry.Key >= 105;
+        }
+    }
+
+    /// <summary>
+    ///
+    /// </summary>
+    public class CacheStoreTest
+    {
+        /** */
+        private const string PortableStoreCacheName = "portable_store";
+
+        /** */
+        private const string ObjectStoreCacheName = "object_store";
+
+        /** */
+        private const string CustomStoreCacheName = "custom_store";
+
+        /** */
+        private const string TemplateStoreCacheName = "template_store*";
+
+        /// <summary>
+        ///
+        /// </summary>
+        [TestFixtureSetUp]
+        public void BeforeTests()
+        {
+            //TestUtils.JVM_DEBUG = true;
+
+            TestUtils.KillProcesses();
+
+            TestUtils.JvmDebug = true;
+
+            IgniteConfigurationEx cfg = new IgniteConfigurationEx();
+
+            cfg.GridName = GridName();
+            cfg.JvmClasspath = TestUtils.CreateTestClasspath();
+            cfg.JvmOptions = TestUtils.TestJavaOptions();
+            cfg.SpringConfigUrl = "config\\native-client-test-cache-store.xml";
+
+            PortableConfiguration portCfg = new PortableConfiguration();
+
+            portCfg.Types = new List<string> { typeof(Key).FullName, typeof(Value).FullName };
+
+            cfg.PortableConfiguration = portCfg;
+
+            Ignition.Start(cfg);
+        }
+
+        /// <summary>
+        ///
+        /// </summary>
+        [TestFixtureTearDown]
+        public virtual void AfterTests()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        ///
+        /// </summary>
+        [SetUp]
+        public void BeforeTest()
+        {
+            Console.WriteLine("Test started: " + TestContext.CurrentContext.Test.Name);
+        }
+
+        /// <summary>
+        ///
+        /// </summary>
+        [TearDown]
+        public void AfterTest()
+        {
+            var cache = Cache();
+
+            cache.Clear();
+
+            Assert.IsTrue(cache.IsEmpty, "Cache is not empty: " + cache.Size());
+
+            CacheTestStore.Reset();
+
+            Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name);
+        }
+
+        [Test]
+        public void TestLoadCache()
+        {
+            var cache = Cache();
+
+            Assert.AreEqual(0, cache.Size());
+
+            cache.LoadCache(new CacheEntryFilter(), 100, 10);
+
+            Assert.AreEqual(5, cache.Size());
+
+            for (int i = 105; i < 110; i++)
+                Assert.AreEqual("val_" + i, cache.Get(i));
+        }
+
+        [Test]
+        public void TestLocalLoadCache()
+        {
+            var cache = Cache();
+
+            Assert.AreEqual(0, cache.Size());
+
+            cache.LocalLoadCache(new CacheEntryFilter(), 100, 10);
+
+            Assert.AreEqual(5, cache.Size());
+
+            for (int i = 105; i < 110; i++)
+                Assert.AreEqual("val_" + i, cache.Get(i));
+        }
+
+        [Test]
+        public void TestLoadCacheMetadata()
+        {
+            CacheTestStore.LoadObjects = true;
+
+            var cache = Cache();
+
+            Assert.AreEqual(0, cache.Size());
+
+            cache.LocalLoadCache(null, 0, 3);
+
+            Assert.AreEqual(3, cache.Size());
+
+            var meta = cache.WithKeepPortable<Key, IPortableObject>().Get(new Key(0)).Metadata();
+
+            Assert.NotNull(meta);
+
+            Assert.AreEqual("Value", meta.TypeName);
+        }
+
+        [Test]
+        public void TestLoadCacheAsync()
+        {
+            var cache = Cache().WithAsync();
+
+            Assert.AreEqual(0, cache.Size());
+
+            cache.LocalLoadCache(new CacheEntryFilter(), 100, 10);
+
+            var fut = cache.GetFuture<object>();
+
+            fut.Get();
+
+            Assert.IsTrue(fut.IsDone);
+
+            cache.Size();
+            Assert.AreEqual(5, cache.GetFuture<int>().ToTask().Result);
+
+            for (int i = 105; i < 110; i++)
+            {
+                cache.Get(i);
+
+                Assert.AreEqual("val_" + i, cache.GetFuture<string>().ToTask().Result);
+            }
+        }
+
+        [Test]
+        public void TestPutLoad()
+        {
+            var cache = Cache();
+
+            cache.Put(1, "val");
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(1, map.Count);
+
+            cache.LocalEvict(new[] { 1 });
+
+            Assert.AreEqual(0, cache.Size());
+
+            Assert.AreEqual("val", cache.Get(1));
+
+            Assert.AreEqual(1, cache.Size());
+        }
+
+        [Test]
+        public void TestPutLoadPortables()
+        {
+            var cache = PortableStoreCache<int, Value>();
+
+            cache.Put(1, new Value(1));
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(1, map.Count);
+
+            IPortableObject v = (IPortableObject)map[1];
+
+            Assert.AreEqual(1, v.Field<int>("_idx"));
+
+            cache.LocalEvict(new[] { 1 });
+
+            Assert.AreEqual(0, cache.Size());
+
+            Assert.AreEqual(1, cache.Get(1).Index());
+
+            Assert.AreEqual(1, cache.Size());
+        }
+
+        [Test]
+        public void TestPutLoadObjects()
+        {
+            var cache = ObjectStoreCache<int, Value>();
+
+            cache.Put(1, new Value(1));
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(1, map.Count);
+
+            Value v = (Value)map[1];
+
+            Assert.AreEqual(1, v.Index());
+
+            cache.LocalEvict(new[] { 1 });
+
+            Assert.AreEqual(0, cache.Size());
+
+            Assert.AreEqual(1, cache.Get(1).Index());
+
+            Assert.AreEqual(1, cache.Size());
+        }
+
+        [Test]
+        public void TestPutLoadAll()
+        {
+            var putMap = new Dictionary<int, string>();
+
+            for (int i = 0; i < 10; i++)
+                putMap.Add(i, "val_" + i);
+
+            var cache = Cache();
+
+            cache.PutAll(putMap);
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(10, map.Count);
+
+            for (int i = 0; i < 10; i++)
+                Assert.AreEqual("val_" + i, map[i]);
+
+            cache.Clear();
+
+            Assert.AreEqual(0, cache.Size());
+
+            ICollection<int> keys = new List<int>();
+
+            for (int i = 0; i < 10; i++)
+                keys.Add(i);
+
+            IDictionary<int, string> loaded = cache.GetAll(keys);
+
+            Assert.AreEqual(10, loaded.Count);
+
+            for (int i = 0; i < 10; i++)
+                Assert.AreEqual("val_" + i, loaded[i]);
+
+            Assert.AreEqual(10, cache.Size());
+        }
+
+        [Test]
+        public void TestRemove()
+        {
+            var cache = Cache();
+
+            for (int i = 0; i < 10; i++)
+                cache.Put(i, "val_" + i);
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(10, map.Count);
+
+            for (int i = 0; i < 5; i++)
+                cache.Remove(i);
+
+            Assert.AreEqual(5, map.Count);
+
+            for (int i = 5; i < 10; i++)
+                Assert.AreEqual("val_" + i, map[i]);
+        }
+
+        [Test]
+        public void TestRemoveAll()
+        {
+            var cache = Cache();
+
+            for (int i = 0; i < 10; i++)
+                cache.Put(i, "val_" + i);
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(10, map.Count);
+
+            cache.RemoveAll(new List<int> { 0, 1, 2, 3, 4 });
+
+            Assert.AreEqual(5, map.Count);
+
+            for (int i = 5; i < 10; i++)
+                Assert.AreEqual("val_" + i, map[i]);
+        }
+
+        [Test]
+        public void TestTx()
+        {
+            var cache = Cache();
+
+            using (var tx = cache.Ignite.Transactions.TxStart())
+            {
+                CacheTestStore.ExpCommit = true;
+
+                tx.AddMeta("meta", 100);
+
+                cache.Put(1, "val");
+
+                tx.Commit();
+            }
+
+            IDictionary map = StoreMap();
+
+            Assert.AreEqual(1, map.Count);
+
+            Assert.AreEqual("val", map[1]);
+        }
+
+        [Test]
+        public void TestLoadCacheMultithreaded()
+        {
+            CacheTestStore.LoadMultithreaded = true;
+
+            var cache = Cache();
+
+            Assert.AreEqual(0, cache.Size());
+
+            cache.LocalLoadCache(null, 0, null);
+
+            Assert.AreEqual(1000, cache.Size());
+
+            for (int i = 0; i < 1000; i++)
+                Assert.AreEqual("val_" + i, cache.Get(i));
+        }
+
+        [Test]
+        public void TestCustomStoreProperties()
+        {
+            var cache = CustomStoreCache();
+            Assert.IsNotNull(cache);
+
+            Assert.AreEqual(42, CacheTestStore.intProperty);
+            Assert.AreEqual("String value", CacheTestStore.stringProperty);
+        }
+
+        [Test]
+        public void TestDynamicStoreStart()
+        {
+            var cache = TemplateStoreCache();
+
+            Assert.IsNotNull(cache);
+
+            cache.Put(1, cache.Name);
+
+            Assert.AreEqual(cache.Name, CacheTestStore.Map[1]);
+        }
+
+        /// <summary>
+        /// Get's grid name for this test.
+        /// </summary>
+        /// <returns>Grid name.</returns>
+        protected virtual string GridName()
+        {
+            return null;
+        }
+
+        private IDictionary StoreMap()
+        {
+            return CacheTestStore.Map;
+        }
+
+        private ICache<int, string> Cache()
+        {
+            return PortableStoreCache<int, string>();
+        }
+
+        private ICache<TK, TV> PortableStoreCache<TK, TV>()
+        {
+            return Ignition.GetIgnite(GridName()).Cache<TK, TV>(PortableStoreCacheName);
+        }
+
+        private ICache<TK, TV> ObjectStoreCache<TK, TV>()
+        {
+            return Ignition.GetIgnite(GridName()).Cache<TK, TV>(ObjectStoreCacheName);
+        }
+
+        private ICache<int, string> CustomStoreCache()
+        {
+            return Ignition.GetIgnite(GridName()).Cache<int, string>(CustomStoreCacheName);
+        }
+
+        private ICache<int, string> TemplateStoreCache()
+        {
+            var cacheName = TemplateStoreCacheName.Replace("*", Guid.NewGuid().ToString());
+            
+            return Ignition.GetIgnite(GridName()).GetOrCreateCache<int, string>(cacheName);
+        }
+    }
+
+    /// <summary>
+    /// 
+    /// </summary>
+    public class NamedNodeCacheStoreTest : CacheStoreTest
+    {
+        /** <inheritDoc /> */
+        protected override string GridName()
+        {
+            return "name";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
new file mode 100644
index 0000000..770ca83
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestParallelLoadStore.cs
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using System.Collections;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Threading;
+    using Apache.Ignite.Core.Cache.Store;
+
+    /// <summary>
+    /// Test cache store with parallel load.
+    /// </summary>
+    public class CacheTestParallelLoadStore : CacheParallelLoadStoreAdapter
+    {
+        /** Length of input data sequence */
+        public const int InputDataLength = 10000;
+
+        /** list of thread ids where Parse has been executed */
+        private static readonly ConcurrentDictionary<int, int> ThreadIds = new ConcurrentDictionary<int, int>();
+
+        /// <summary>
+        /// Gets the count of unique threads that entered Parse method.
+        /// </summary>
+        public static int UniqueThreadCount
+        {
+            get { return ThreadIds.Count; }
+        }
+
+        /// <summary>
+        /// Resets the test counters.
+        /// </summary>
+        public static void ResetCounters()
+        {
+            ThreadIds.Clear();
+        }
+
+        /** <inheritdoc /> */
+        protected override IEnumerable GetInputData()
+        {
+            return Enumerable.Range(0, InputDataLength).Select(x => new Record {Id = x, Name = "Test Record " + x});
+        }
+
+        /** <inheritdoc /> */
+        protected override KeyValuePair<object, object>? Parse(object inputRecord, params object[] args)
+        {
+            var threadId = Thread.CurrentThread.ManagedThreadId;
+            ThreadIds.GetOrAdd(threadId, threadId);
+
+            var minId = (int)args[0];
+
+            var rec = (Record)inputRecord;
+
+            return rec.Id >= minId
+                ? new KeyValuePair<object, object>(rec.Id, rec)
+                : (KeyValuePair<object, object>?) null;
+        }
+
+        /// <summary>
+        /// Test store record.
+        /// </summary>
+        public class Record
+        {
+            /// <summary>
+            /// Gets or sets the identifier.
+            /// </summary>
+            public int Id { get; set; }
+
+            /// <summary>
+            /// Gets or sets the name.
+            /// </summary>
+            public string Name { get; set; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
new file mode 100644
index 0000000..9c381cb
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Store
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Concurrent;
+    using System.Diagnostics;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using System.Threading;
+    using Apache.Ignite.Core.Cache.Store;
+    using Apache.Ignite.Core.Resource;
+
+    [SuppressMessage("ReSharper", "FieldCanBeMadeReadOnly.Local")]
+    public class CacheTestStore : ICacheStore
+    {
+        public static readonly IDictionary Map = new ConcurrentDictionary<object, object>();
+
+        public static bool ExpCommit;
+        
+        public static bool LoadMultithreaded;
+
+        public static bool LoadObjects;
+
+        [InstanceResource]
+        private IIgnite _grid = null;
+
+        [StoreSessionResource]
+#pragma warning disable 649
+        private ICacheStoreSession _ses;
+#pragma warning restore 649
+
+        public static int intProperty;
+
+        public static string stringProperty;
+
+        public static void Reset()
+        {
+            Map.Clear();
+
+            ExpCommit = false;
+            LoadMultithreaded = false;
+            LoadObjects = false;
+        }
+
+        public void LoadCache(Action<object, object> act, params object[] args)
+        {
+            Debug.Assert(_grid != null);
+
+            if (LoadMultithreaded)
+            {
+                int cnt = 0;
+
+                TestUtils.RunMultiThreaded(() => {
+                    int i;
+
+                    while ((i = Interlocked.Increment(ref cnt) - 1) < 1000)
+                        act(i, "val_" + i);
+                }, 8);
+            }
+            else
+            {
+                int start = (int)args[0];
+                int cnt = (int)args[1];
+
+                for (int i = start; i < start + cnt; i++)
+                {
+                    if (LoadObjects)
+                        act(new Key(i), new Value(i));
+                    else
+                        act(i, "val_" + i);
+                }
+            }
+        }
+
+        public object Load(object key)
+        {
+            Debug.Assert(_grid != null);
+
+            return Map[key];
+        }
+
+        public IDictionary LoadAll(ICollection keys)
+        {
+            Debug.Assert(_grid != null);
+
+            return keys.OfType<object>().ToDictionary(key => key, Load);
+        }
+
+        public void Write(object key, object val)
+        {
+            Debug.Assert(_grid != null);
+
+            Map[key] = val;
+        }
+
+        public void WriteAll(IDictionary map)
+        {
+            Debug.Assert(_grid != null);
+
+            foreach (DictionaryEntry e in map)
+                Map[e.Key] = e.Value;
+        }
+
+        public void Delete(object key)
+        {
+            Debug.Assert(_grid != null);
+
+            Map.Remove(key);
+        }
+
+        public void DeleteAll(ICollection keys)
+        {
+            Debug.Assert(_grid != null);
+
+            foreach (object key in keys)
+                Map.Remove(key);
+        }
+
+        public void SessionEnd(bool commit)
+        {
+            Debug.Assert(_grid != null);
+
+            Debug.Assert(_ses != null);
+        }
+
+        public int IntProperty
+        {
+            get { return intProperty; }
+            set { intProperty = value; }
+        }
+
+        public string StringProperty
+        {
+            get { return stringProperty; }
+            set { stringProperty = value; }
+        }
+    }
+}


Mime
View raw message