ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [18/55] [abbrv] ignite git commit: IGNITE-1348: Moved GridGain's .Net module to Ignite.
Date Fri, 04 Sep 2015 16:27:32 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs
new file mode 100644
index 0000000..3c38ef9
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LifecycleTest.cs
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl;
+    using Apache.Ignite.Core.Lifecycle;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Lifecycle beans test.
+    /// </summary>
+    public class LifecycleTest
+    {
+        /** Configuration: without Java beans. */
+        private const string CfgNoBeans = "config//lifecycle//lifecycle-no-beans.xml";
+
+        /** Configuration: with Java beans. */
+        private const string CfgBeans = "config//lifecycle//lifecycle-beans.xml";
+
+        /** Whether to throw an error on lifecycle event. */
+        internal static bool ThrowErr;
+
+        /** Events: before start. */
+        internal static IList<Event> BeforeStartEvts;
+
+        /** Events: after start. */
+        internal static IList<Event> AfterStartEvts;
+
+        /** Events: before stop. */
+        internal static IList<Event> BeforeStopEvts;
+
+        /** Events: after stop. */
+        internal static IList<Event> AfterStopEvts;
+
+        /// <summary>
+        /// Set up routine.
+        /// </summary>
+        [SetUp]
+        public void SetUp()
+        {
+            ThrowErr = false;
+
+            BeforeStartEvts = new List<Event>();
+            AfterStartEvts = new List<Event>();
+            BeforeStopEvts = new List<Event>();
+            AfterStopEvts = new List<Event>();
+        }
+
+        /// <summary>
+        /// Tear down routine.
+        /// </summary>
+        [TearDown]
+        public void TearDown()
+        {
+            Ignition.StopAll(true);
+        }
+        
+        /// <summary>
+        /// Test without Java beans.
+        /// </summary>
+        [Test]
+        public void TestWithoutBeans()
+        {
+            // 1. Test start events.
+            IIgnite grid = Start(CfgNoBeans);
+
+            Assert.AreEqual(2, BeforeStartEvts.Count);
+            CheckEvent(BeforeStartEvts[0], null, null, 0, null);
+            CheckEvent(BeforeStartEvts[1], null, null, 0, null);
+
+            Assert.AreEqual(2, AfterStartEvts.Count);
+            CheckEvent(AfterStartEvts[0], grid, grid, 0, null);
+            CheckEvent(AfterStartEvts[1], grid, grid, 0, null);
+
+            // 2. Test stop events.
+            Ignition.Stop(grid.Name, false);
+
+            Assert.AreEqual(2, BeforeStartEvts.Count);
+            Assert.AreEqual(2, AfterStartEvts.Count);
+
+            Assert.AreEqual(2, BeforeStopEvts.Count);
+            CheckEvent(BeforeStopEvts[0], grid, grid, 0, null);
+            CheckEvent(BeforeStopEvts[1], grid, grid, 0, null);
+
+            Assert.AreEqual(2, AfterStopEvts.Count);
+            CheckEvent(AfterStopEvts[0], grid, grid, 0, null);
+            CheckEvent(AfterStopEvts[1], grid, grid, 0, null);
+        }
+
+        /// <summary>
+        /// Test with Java beans.
+        /// </summary>
+        [Test]
+        public void TestWithBeans()
+        {
+            // 1. Test .Net start events.
+            IIgnite grid = Start(CfgBeans);
+
+            Assert.AreEqual(4, BeforeStartEvts.Count);
+            CheckEvent(BeforeStartEvts[0], null, null, 0, null);
+            CheckEvent(BeforeStartEvts[1], null, null, 1, "1");
+            CheckEvent(BeforeStartEvts[2], null, null, 0, null);
+            CheckEvent(BeforeStartEvts[3], null, null, 0, null);
+
+            Assert.AreEqual(4, AfterStartEvts.Count);
+            CheckEvent(AfterStartEvts[0], grid, grid, 0, null);
+            CheckEvent(AfterStartEvts[1], grid, grid, 1, "1");
+            CheckEvent(AfterStartEvts[2], grid, grid, 0, null);
+            CheckEvent(AfterStartEvts[3], grid, grid, 0, null);
+
+            // 2. Test Java start events.
+            IList<int> res = grid.Compute().ExecuteJavaTask<IList<int>>(
+                "org.apache.ignite.platform.lifecycle.PlatformJavaLifecycleTask", null);
+
+            Assert.AreEqual(2, res.Count);
+            Assert.AreEqual(3, res[0]);
+            Assert.AreEqual(3, res[1]);
+
+            // 3. Test .Net stop events.
+            Ignition.Stop(grid.Name, false);
+
+            Assert.AreEqual(4, BeforeStartEvts.Count);
+            Assert.AreEqual(4, AfterStartEvts.Count);
+
+            Assert.AreEqual(4, BeforeStopEvts.Count);
+            CheckEvent(BeforeStopEvts[0], grid, grid, 0, null);
+            CheckEvent(BeforeStopEvts[1], grid, grid, 1, "1");
+            CheckEvent(BeforeStopEvts[2], grid, grid, 0, null);
+            CheckEvent(BeforeStopEvts[3], grid, grid, 0, null);
+
+            Assert.AreEqual(4, AfterStopEvts.Count);
+            CheckEvent(AfterStopEvts[0], grid, grid, 0, null);
+            CheckEvent(AfterStopEvts[1], grid, grid, 1, "1");
+            CheckEvent(AfterStopEvts[2], grid, grid, 0, null);
+            CheckEvent(AfterStopEvts[3], grid, grid, 0, null);
+        }
+
+        /// <summary>
+        /// Test behavior when error is thrown from lifecycle beans.
+        /// </summary>
+        [Test]
+        public void TestError()
+        {
+            ThrowErr = true;
+
+            try
+            {
+                Start(CfgNoBeans);
+
+                Assert.Fail("Should not reach this place.");
+            }
+            catch (Exception e)
+            {
+                Assert.AreEqual(typeof(IgniteException), e.GetType());
+            }
+        }
+
+        /// <summary>
+        /// Start grid.
+        /// </summary>
+        /// <param name="cfgPath">Spring configuration path.</param>
+        /// <returns>Grid.</returns>
+        private static IIgnite Start(string cfgPath)
+        {
+            TestUtils.JvmDebug = true;
+
+            IgniteConfiguration cfg = new IgniteConfiguration();
+
+            cfg.JvmClasspath = TestUtils.CreateTestClasspath();
+            cfg.JvmOptions = TestUtils.TestJavaOptions();
+            cfg.SpringConfigUrl = cfgPath;
+
+            cfg.LifecycleBeans = new List<ILifecycleBean> { new Bean(), new Bean()
};
+
+            return Ignition.Start(cfg);
+        }
+
+        /// <summary>
+        /// Check event.
+        /// </summary>
+        /// <param name="evt">Event.</param>
+        /// <param name="expGrid1">Expected grid 1.</param>
+        /// <param name="expGrid2">Expected grid 2.</param>
+        /// <param name="expProp1">Expected property 1.</param>
+        /// <param name="expProp2">Expected property 2.</param>
+        private static void CheckEvent(Event evt, IIgnite expGrid1, IIgnite expGrid2, int
expProp1, string expProp2)
+        {
+            if (evt.Grid1 != null && evt.Grid1 is IgniteProxy)
+                evt.Grid1 = (evt.Grid1 as IgniteProxy).Target;
+
+            if (evt.Grid2 != null && evt.Grid2 is IgniteProxy)
+                evt.Grid2 = (evt.Grid2 as IgniteProxy).Target;
+
+            Assert.AreEqual(expGrid1, evt.Grid1);
+            Assert.AreEqual(expGrid2, evt.Grid2);
+            Assert.AreEqual(expProp1, evt.Prop1);
+            Assert.AreEqual(expProp2, evt.Prop2);
+        }
+    }
+
+    public abstract class AbstractBean
+    {
+        [InstanceResource]
+        public IIgnite Grid1;
+
+        public int Property1
+        {
+            get;
+            set;
+        }
+    }
+
+    public class Bean : AbstractBean, ILifecycleBean
+    {
+        [InstanceResource]
+        public IIgnite Grid2;
+
+        public string Property2
+        {
+            get;
+            set;
+        }
+
+        /** <inheritDoc /> */
+        public void OnLifecycleEvent(LifecycleEventType evtType)
+        {
+            if (LifecycleTest.ThrowErr)
+                throw new Exception("Lifecycle exception.");
+
+            Event evt = new Event();
+
+            evt.Grid1 = Grid1;
+            evt.Grid2 = Grid2;
+            evt.Prop1 = Property1;
+            evt.Prop2 = Property2;
+
+            switch (evtType)
+            {
+                case LifecycleEventType.BeforeNodeStart:
+                    LifecycleTest.BeforeStartEvts.Add(evt);
+
+                    break;
+
+                case LifecycleEventType.AfterNodeStart:
+                    LifecycleTest.AfterStartEvts.Add(evt);
+
+                    break;
+
+                case LifecycleEventType.BeforeNodeStop:
+                    LifecycleTest.BeforeStopEvts.Add(evt);
+
+                    break;
+
+                case LifecycleEventType.AfterNodeStop:
+                    LifecycleTest.AfterStopEvts.Add(evt);
+
+                    break;
+            }
+        }
+    }
+
+    public class Event
+    {
+        public IIgnite Grid1;
+        public IIgnite Grid2;
+        public int Prop1;
+        public string Prop2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs
new file mode 100644
index 0000000..af9387c
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/LoadDllTest.cs
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests
+{
+    using System;
+    using System.CodeDom.Compiler;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Linq;
+    using Apache.Ignite.Core.Common;
+    using Microsoft.CSharp;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Dll loading test.
+    /// </summary>
+    public class LoadDllTest
+    {
+        /// <summary>
+        /// 
+        /// </summary>
+        [SetUp]
+        public void SetUp()
+        {
+            TestUtils.KillProcesses();
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [TearDown]
+        public void TearDown()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadFromGac()
+        {
+            Assert.False(IsLoaded("System.Data.Linq"));
+
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies =
+                    new List<string>
+                    {
+                        "System.Data.Linq,Culture=neutral,Version=1.0.0.0,PublicKeyToken=b77a5c561934e089"
+                    },
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+
+            var grid = Ignition.Start(cfg);
+
+            Assert.IsNotNull(grid);
+
+            Assert.True(IsLoaded("System.Data.Linq"));
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadFromCurrentDir()
+        {
+            Assert.False(IsLoaded("testDll"));
+
+            GenerateDll("testDll.dll");
+
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies = new List<string> {"testDll.dll"},
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            var grid = Ignition.Start(cfg);
+
+            Assert.IsNotNull(grid);
+
+            Assert.True(IsLoaded("testDll"));
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadAllDllInDir()
+        {
+            var dirInfo = Directory.CreateDirectory(Path.GetTempPath() + "/testDlls");
+            
+            Assert.False(IsLoaded("dllFromDir1"));
+            Assert.False(IsLoaded("dllFromDir2"));
+
+            GenerateDll(dirInfo.FullName + "/dllFromDir1.dll");
+            GenerateDll(dirInfo.FullName + "/dllFromDir2.dll");
+            File.WriteAllText(dirInfo.FullName + "/notADll.txt", "notADll");
+
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies = new List<string> {dirInfo.FullName},
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            var grid = Ignition.Start(cfg);
+
+            Assert.IsNotNull(grid);
+
+            Assert.True(IsLoaded("dllFromDir1"));
+            Assert.True(IsLoaded("dllFromDir2"));
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadFromCurrentDirByName()
+        {
+            Assert.False(IsLoaded("testDllByName"));
+
+            GenerateDll("testDllByName.dll");
+
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies = new List<string> {"testDllByName"},
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            var grid = Ignition.Start(cfg);
+
+            Assert.IsNotNull(grid);
+
+            Assert.True(IsLoaded("testDllByName"));
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadByAbsoluteUri()
+        {
+            var dllPath = Path.GetTempPath() + "/tempDll.dll";
+            Assert.False(IsLoaded("tempDll"));
+
+            GenerateDll(dllPath);
+
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies = new List<string> {dllPath},
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            var grid = Ignition.Start(cfg);
+
+            Assert.IsNotNull(grid);
+
+            Assert.True(IsLoaded("tempDll"));
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        [Test]
+        public void TestLoadUnexistingLibrary()
+        {
+            var cfg = new IgniteConfiguration
+            {
+                SpringConfigUrl = "config\\start-test-grid3.xml",
+                Assemblies = new List<string> {"unexistingAssembly.820482.dll"},
+                JvmClasspath = TestUtils.CreateTestClasspath()
+            };
+
+            try
+            {
+                Ignition.Start(cfg);
+
+                Assert.Fail("Grid has been started with broken configuration.");
+            }
+            catch (IgniteException)
+            {
+
+            }
+        }
+
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="outputPath"></param>
+        private void GenerateDll(string outputPath)
+        {
+            var codeProvider = new CSharpCodeProvider();
+
+#pragma warning disable 0618
+
+            var icc = codeProvider.CreateCompiler();
+
+#pragma warning restore 0618
+
+            var parameters = new CompilerParameters
+            {
+                GenerateExecutable = false,
+                OutputAssembly = outputPath
+            };
+
+            var src = "namespace GridGain.Client.Test { public class Foo {}}";
+
+            var results = icc.CompileAssemblyFromSource(parameters, src);
+
+            Assert.False(results.Errors.HasErrors);
+        }
+
+        /// <summary>
+        /// Determines whether the specified assembly is loaded.
+        /// </summary>
+        /// <param name="asmName">Name of the assembly.</param>
+        private static bool IsLoaded(string asmName)
+        {
+            return AppDomain.CurrentDomain.GetAssemblies().Any(a => a.GetName().Name ==
asmName);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs
new file mode 100644
index 0000000..d3af288
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MarshallerTest.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests
+{
+    using Apache.Ignite.Core.Common;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Test marshaller initialization.
+    /// </summary>
+    public class MarshallerTest
+    {
+        /// <summary>
+        /// Tests the default marhsaller.
+        /// By default, portable marshaller is used.
+        /// </summary>
+        [Test]
+        public void TestDefaultMarhsaller()
+        {
+            using (var grid = Ignition.Start("config\\marshaller-default.xml"))
+            {
+                var cache = grid.GetOrCreateCache<int, int>(null);
+
+                cache.Put(1, 1);
+
+                Assert.AreEqual(1, cache.Get(1));
+            }
+        }
+
+        /// <summary>
+        /// Tests the portable marhsaller.
+        /// PortableMarshaller can be specified explicitly in config.
+        /// </summary>
+        [Test]
+        public void TestPortableMarhsaller()
+        {
+            using (var grid = Ignition.Start("config\\marshaller-portable.xml"))
+            {
+                var cache = grid.GetOrCreateCache<int, int>(null);
+
+                cache.Put(1, 1);
+
+                Assert.AreEqual(1, cache.Get(1));
+            }
+        }
+
+        /// <summary>
+        /// Tests the invalid marshaller.
+        /// </summary>
+        [Test]
+        public void TestInvalidMarshaller()
+        {
+            Assert.Throws<IgniteException>(() => Ignition.Start("config\\marshaller-invalid.xml"));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
new file mode 100644
index 0000000..abb8e2f
--- /dev/null
+++ b/modules/platform/src/test/dotnet/Apache.Ignite.Core.Tests/MessagingTest.cs
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Messaging;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// <see cref="IMessaging"/> tests.
+    /// </summary>
+    public class MessagingTest
+    {
+        /** */
+        private IIgnite _grid1;
+
+        /** */
+        private IIgnite _grid2;
+
+        /** */
+        private IIgnite _grid3;
+
+        /** */
+        public static int MessageId;
+
+        /// <summary>
+        /// Executes before each test.
+        /// </summary>
+        [SetUp]
+        public void SetUp()
+        {
+            _grid1 = Ignition.Start(Configuration("config\\compute\\compute-grid1.xml"));
+            _grid2 = Ignition.Start(Configuration("config\\compute\\compute-grid2.xml"));
+            _grid3 = Ignition.Start(Configuration("config\\compute\\compute-grid3.xml"));
+        }
+
+        /// <summary>
+        /// Executes after each test.
+        /// </summary>
+        [TearDown]
+        public virtual void TearDown()
+        {
+            try
+            {
+                TestUtils.AssertHandleRegistryIsEmpty(1000, _grid1, _grid2, _grid3);
+
+                MessagingTestHelper.AssertFailures();
+            }
+            finally 
+            {
+                // Stop all grids between tests to drop any hanging messages
+                Ignition.StopAll(true);
+            }
+        }
+
+        /// <summary>
+        /// Tests LocalListen.
+        /// </summary>
+        [Test]
+        public void TestLocalListen()
+        {
+            TestLocalListen(null);
+            TestLocalListen("string topic");
+            TestLocalListen(NextId());
+        }
+
+        /// <summary>
+        /// Tests LocalListen.
+        /// </summary>
+        [SuppressMessage("ReSharper", "AccessToModifiedClosure")]
+        public void TestLocalListen(object topic)
+        {
+            var messaging = _grid1.Message();
+            var listener = MessagingTestHelper.GetListener();
+            messaging.LocalListen(listener, topic);
+
+            // Test sending
+            CheckSend(topic);
+            CheckSend(topic, _grid2);
+            CheckSend(topic, _grid3);
+
+            // Test different topic
+            CheckNoMessage(NextId());
+            CheckNoMessage(NextId(), _grid2);
+
+            // Test multiple subscriptions for the same filter
+            messaging.LocalListen(listener, topic);
+            messaging.LocalListen(listener, topic);
+            CheckSend(topic, repeatMultiplier: 3); // expect all messages repeated 3 times
+
+            messaging.StopLocalListen(listener, topic);
+            CheckSend(topic, repeatMultiplier: 2); // expect all messages repeated 2 times
+
+            messaging.StopLocalListen(listener, topic);
+            CheckSend(topic); // back to 1 listener
+
+            // Test message type mismatch
+            var ex = Assert.Throws<IgniteException>(() => messaging.Send(1.1, topic));
+            Assert.AreEqual("Unable to cast object of type 'System.Double' to type 'System.String'.",
ex.Message);
+
+            // Test end listen
+            MessagingTestHelper.ListenResult = false;
+            CheckSend(topic, single: true); // we'll receive one more and then unsubscribe
because of delegate result.
+            CheckNoMessage(topic);
+
+            // Start again
+            MessagingTestHelper.ListenResult = true;
+            messaging.LocalListen(listener, topic);
+            CheckSend(topic);
+
+            // Stop
+            messaging.StopLocalListen(listener, topic);
+            CheckNoMessage(topic);
+        }
+
+        /// <summary>
+        /// Tests LocalListen with projection.
+        /// </summary>
+        [Test]
+        public void TestLocalListenProjection()
+        {
+            TestLocalListenProjection(null);
+            TestLocalListenProjection("prj");
+            TestLocalListenProjection(NextId());
+        }
+
+        /// <summary>
+        /// Tests LocalListen with projection.
+        /// </summary>
+        private void TestLocalListenProjection(object topic)
+        {
+            var grid3GotMessage = false;
+
+            var grid3Listener = new MessageFilter<string>((id, x) =>
+            {
+                grid3GotMessage = true;
+                return true;
+            });
+
+            _grid3.Message().LocalListen(grid3Listener, topic);
+
+            var clusterMessaging = _grid1.Cluster.ForNodes(_grid1.Cluster.LocalNode, _grid2.Cluster.LocalNode).Message();
+            var clusterListener = MessagingTestHelper.GetListener();
+            clusterMessaging.LocalListen(clusterListener, topic);
+
+            CheckSend(msg: clusterMessaging, topic: topic);
+            Assert.IsFalse(grid3GotMessage, "Grid3 should not get messages");
+
+            CheckSend(grid: _grid2, msg: clusterMessaging, topic: topic);
+            Assert.IsFalse(grid3GotMessage, "Grid3 should not get messages");
+
+            clusterMessaging.StopLocalListen(clusterListener, topic);
+            _grid3.Message().StopLocalListen(grid3Listener, topic);
+        }
+
+        /// <summary>
+        /// Tests LocalListen in multithreaded mode.
+        /// </summary>
+        [Test]
+        [SuppressMessage("ReSharper", "AccessToModifiedClosure")]
+        [Category(TestUtils.CategoryIntensive)]
+        public void TestLocalListenMultithreaded()
+        {
+            const int threadCnt = 20;
+            const int runSeconds = 20;
+
+            var messaging = _grid1.Message();
+
+            var senders = Task.Factory.StartNew(() => TestUtils.RunMultiThreaded(() =>
+            {
+                messaging.Send((object) NextMessage());
+                Thread.Sleep(50);
+            }, threadCnt, runSeconds));
+
+
+            var sharedReceived = 0;
+
+            var sharedListener = new MessageFilter<string>((id, x) =>
+            {
+                Interlocked.Increment(ref sharedReceived);
+                Thread.MemoryBarrier();
+                return true;
+            });
+
+            TestUtils.RunMultiThreaded(() =>
+            {
+                // Check that listen/stop work concurrently
+                messaging.LocalListen(sharedListener);
+
+                for (int i = 0; i < 100; i++)
+                {
+                    messaging.LocalListen(sharedListener);
+                    messaging.StopLocalListen(sharedListener);
+                }
+
+                var localReceived = 0;
+                var stopLocal = 0;
+
+                var localListener = new MessageFilter<string>((id, x) =>
+                {
+                    Interlocked.Increment(ref localReceived);
+                    Thread.MemoryBarrier();
+                    return Thread.VolatileRead(ref stopLocal) == 0;
+                });
+
+                messaging.LocalListen(localListener);
+
+                Thread.Sleep(100);
+
+                Thread.VolatileWrite(ref stopLocal, 1);
+
+                Thread.Sleep(1000);
+
+                var result = Thread.VolatileRead(ref localReceived);
+
+                Thread.Sleep(100);
+
+                // Check that unsubscription worked properly
+                Assert.AreEqual(result, Thread.VolatileRead(ref localReceived));
+
+                messaging.StopLocalListen(sharedListener);
+
+            }, threadCnt, runSeconds);
+
+            senders.Wait();
+
+            Thread.Sleep(100);
+
+            var sharedResult = Thread.VolatileRead(ref sharedReceived);
+
+            messaging.Send((object)NextMessage());
+
+            Thread.Sleep(MessagingTestHelper.MessageTimeout);
+
+            // Check that unsubscription worked properly
+            Assert.AreEqual(sharedResult, Thread.VolatileRead(ref sharedReceived));
+        }
+
+        /// <summary>
+        /// Tests RemoteListen.
+        /// </summary>
+        [Test]
+        public void TestRemoteListen()
+        {
+            TestRemoteListen(null);
+            TestRemoteListen("string topic");
+            TestRemoteListen(NextId());
+        }
+
+        /// <summary>
+        /// Tests RemoteListen with async mode enabled.
+        /// </summary>
+        [Test]
+        public void TestRemoteListenAsync()
+        {
+            TestRemoteListen(null, true);
+            TestRemoteListen("string topic", true);
+            TestRemoteListen(NextId(), true);
+        }
+
+        /// <summary>
+        /// Tests RemoteListen.
+        /// </summary>
+        public void TestRemoteListen(object topic, bool async = false)
+        {
+            var messaging = async ? _grid1.Message().WithAsync() : _grid1.Message();
+
+            var listener = MessagingTestHelper.GetListener();
+            var listenId = messaging.RemoteListen(listener, topic);
+
+            if (async)
+                listenId = messaging.GetFuture<Guid>().Get();
+
+            // Test sending
+            CheckSend(topic, msg: messaging, remoteListen: true);
+
+            // Test different topic
+            CheckNoMessage(NextId());
+
+            // Test multiple subscriptions for the same filter
+            var listenId2 = messaging.RemoteListen(listener, topic);
+
+            if (async)
+                listenId2 = messaging.GetFuture<Guid>().Get();
+
+            CheckSend(topic, msg: messaging, remoteListen: true, repeatMultiplier: 2); //
expect twice the messages
+
+            messaging.StopRemoteListen(listenId2);
+
+            if (async)
+                messaging.GetFuture().Get();
+
+            CheckSend(topic, msg: messaging, remoteListen: true); // back to normal after
unsubscription
+
+            // Test message type mismatch
+            var ex = Assert.Throws<IgniteException>(() => messaging.Send(1.1, topic));
+            Assert.AreEqual("Unable to cast object of type 'System.Double' to type 'System.String'.",
ex.Message);
+
+            // Test end listen
+            messaging.StopRemoteListen(listenId);
+
+            if (async)
+                messaging.GetFuture().Get();
+
+            CheckNoMessage(topic);
+        }
+
+        /// <summary>
+        /// Tests RemoteListen with a projection.
+        /// </summary>
+        [Test]
+        public void TestRemoteListenProjection()
+        {
+            TestRemoteListenProjection(null);
+            TestRemoteListenProjection("string topic");
+            TestRemoteListenProjection(NextId());
+        }
+
+        /// <summary>
+        /// Tests RemoteListen with a projection.
+        /// </summary>
+        private void TestRemoteListenProjection(object topic)
+        {
+            var clusterMessaging = _grid1.Cluster.ForNodes(_grid1.Cluster.LocalNode, _grid2.Cluster.LocalNode).Message();
+            var clusterListener = MessagingTestHelper.GetListener();
+            var listenId = clusterMessaging.RemoteListen(clusterListener, topic);
+
+            CheckSend(msg: clusterMessaging, topic: topic, remoteListen: true);
+
+            clusterMessaging.StopRemoteListen(listenId);
+
+            CheckNoMessage(topic);
+        }
+
+        /// <summary>
+        /// Tests LocalListen in multithreaded mode.
+        /// </summary>
+        [Test]
+        [Category(TestUtils.CategoryIntensive)]
+        public void TestRemoteListenMultithreaded()
+        {
+            const int threadCnt = 20;
+            const int runSeconds = 20;
+
+            var messaging = _grid1.Message();
+
+            var senders = Task.Factory.StartNew(() => TestUtils.RunMultiThreaded(() =>
+            {
+                MessagingTestHelper.ClearReceived(int.MaxValue);
+                messaging.Send((object) NextMessage());
+                Thread.Sleep(50);
+            }, threadCnt, runSeconds));
+
+
+            var sharedListener = MessagingTestHelper.GetListener();
+
+            for (int i = 0; i < 100; i++)
+                messaging.RemoteListen(sharedListener);  // add some listeners to be stopped
by filter result
+
+            TestUtils.RunMultiThreaded(() =>
+            {
+                // Check that listen/stop work concurrently
+                messaging.StopRemoteListen(messaging.RemoteListen(sharedListener));
+
+            }, threadCnt, runSeconds);
+
+            MessagingTestHelper.ListenResult = false;
+
+            messaging.Send((object) NextMessage()); // send a message to make filters return
false
+
+            Thread.Sleep(MessagingTestHelper.MessageTimeout); // wait for all to unsubscribe
+
+            MessagingTestHelper.ListenResult = true;
+
+            senders.Wait(); // wait for senders to stop
+
+            var sharedResult = MessagingTestHelper.ReceivedMessages.Count;
+
+            messaging.Send((object) NextMessage());
+
+            Thread.Sleep(MessagingTestHelper.MessageTimeout);
+
+            // Check that unsubscription worked properly
+            Assert.AreEqual(sharedResult, MessagingTestHelper.ReceivedMessages.Count);
+            
+        }
+
+        /// <summary>
+        /// Sends messages in various ways and verefies correct receival.
+        /// </summary>
+        /// <param name="topic">Topic.</param>
+        /// <param name="grid">The grid to use.</param>
+        /// <param name="msg">Messaging to use.</param>
+        /// <param name="remoteListen">Whether to expect remote listeners.</param>
+        /// <param name="single">When true, only check one message.</param>
+        /// <param name="repeatMultiplier">Expected message count multiplier.</param>
+        private void CheckSend(object topic = null, IIgnite grid = null,
+            IMessaging msg = null, bool remoteListen = false, bool single = false, int repeatMultiplier
= 1)
+        {
+            IClusterGroup cluster;
+
+            if (msg != null)
+                cluster = msg.ClusterGroup;
+            else
+            {
+                grid = grid ?? _grid1;
+                msg = grid.Message();
+                cluster = grid.Cluster.ForLocal();
+            }
+
+            // Messages will repeat due to multiple nodes listening
+            var expectedRepeat = repeatMultiplier * (remoteListen ? cluster.Nodes().Count
: 1);
+
+            var messages = Enumerable.Range(1, 10).Select(x => NextMessage()).OrderBy(x
=> x).ToList();
+
+            // Single message
+            MessagingTestHelper.ClearReceived(expectedRepeat);
+            msg.Send((object) messages[0], topic);
+            MessagingTestHelper.VerifyReceive(cluster, messages.Take(1), m => m.ToList(),
expectedRepeat);
+
+            if (single)
+                return;
+
+            // Multiple messages (receive order is undefined)
+            MessagingTestHelper.ClearReceived(messages.Count * expectedRepeat);
+            msg.Send(messages, topic);
+            MessagingTestHelper.VerifyReceive(cluster, messages, m => m.OrderBy(x =>
x), expectedRepeat);
+
+            // Multiple messages, ordered
+            MessagingTestHelper.ClearReceived(messages.Count * expectedRepeat);
+            messages.ForEach(x => msg.SendOrdered(x, topic, MessagingTestHelper.MessageTimeout));
+
+            if (remoteListen) // in remote scenario messages get mixed up due to different
timing on different nodes
+                MessagingTestHelper.VerifyReceive(cluster, messages, m => m.OrderBy(x
=> x), expectedRepeat);
+            else
+                MessagingTestHelper.VerifyReceive(cluster, messages, m => m.Reverse(),
expectedRepeat);
+        }
+
+        /// <summary>
+        /// Checks that no message has arrived.
+        /// </summary>
+        private void CheckNoMessage(object topic, IIgnite grid = null)
+        {
+            // this will result in an exception in case of a message
+            MessagingTestHelper.ClearReceived(0);
+
+            (grid ?? _grid1).Message().Send(NextMessage(), topic);
+
+            Thread.Sleep(MessagingTestHelper.MessageTimeout);
+
+            MessagingTestHelper.AssertFailures();
+        }
+
+        /// <summary>
+        /// Gets the Ignite configuration.
+        /// </summary>
+        private static IgniteConfiguration Configuration(string springConfigUrl)
+        {
+            return new IgniteConfiguration
+            {
+                SpringConfigUrl = springConfigUrl,
+                JvmClasspath = TestUtils.CreateTestClasspath(),
+                JvmOptions = TestUtils.TestJavaOptions()
+            };
+        }
+
+        /// <summary>
+        /// Generates next message with sequential ID and current test name.
+        /// </summary>
+        private static string NextMessage()
+        {
+            var id = NextId();
+            return id + "_" + TestContext.CurrentContext.Test.Name;
+        }
+
+        /// <summary>
+        /// Generates next sequential ID.
+        /// </summary>
+        private static int NextId()
+        {
+            return Interlocked.Increment(ref MessageId);
+        }
+    }
+
+    /// <summary>
+    /// Messaging test helper class.
+    /// </summary>
+    [Serializable]
+    public static class MessagingTestHelper
+    {
+        /** */
+        public static readonly ConcurrentStack<string> ReceivedMessages = new ConcurrentStack<string>();
+        
+        /** */
+        public static readonly ConcurrentStack<string> Failures = new ConcurrentStack<string>();
+
+        /** */
+        public static readonly CountdownEvent ReceivedEvent = new CountdownEvent(0);
+
+        /** */
+        public static readonly ConcurrentStack<Guid> LastNodeIds = new ConcurrentStack<Guid>();
+
+        /** */
+        public static volatile bool ListenResult = true;
+
+        /** */
+        public static readonly TimeSpan MessageTimeout = TimeSpan.FromMilliseconds(700);
+
+        /// <summary>
+        /// Clears received message information.
+        /// </summary>
+        /// <param name="expectedCount">The expected count of messages to be received.</param>
+        public static void ClearReceived(int expectedCount)
+        {
+            ReceivedMessages.Clear();
+            ReceivedEvent.Reset(expectedCount);
+            LastNodeIds.Clear();
+        }
+
+        /// <summary>
+        /// Verifies received messages against expected messages.
+        /// </summary>
+        /// <param name="cluster">Cluster.</param>
+        /// <param name="expectedMessages">Expected messages.</param>
+        /// <param name="resultFunc">Result transform function.</param>
+        /// <param name="expectedRepeat">Expected repeat count.</param>
+        public static void VerifyReceive(IClusterGroup cluster, IEnumerable<string>
expectedMessages,
+            Func<IEnumerable<string>, IEnumerable<string>> resultFunc,
int expectedRepeat)
+        {
+            // check if expected message count has been received; Wait returns false if there
were none.
+            Assert.IsTrue(ReceivedEvent.Wait(MessageTimeout));
+
+            expectedMessages = expectedMessages.SelectMany(x => Enumerable.Repeat(x, expectedRepeat));
+
+            Assert.AreEqual(expectedMessages, resultFunc(ReceivedMessages));
+
+            // check that all messages came from local node.
+            var localNodeId = cluster.Ignite.Cluster.LocalNode.Id;
+            Assert.AreEqual(localNodeId, LastNodeIds.Distinct().Single());
+            
+            AssertFailures();
+        }
+
+        /// <summary>
+        /// Gets the message listener.
+        /// </summary>
+        /// <returns>New instance of message listener.</returns>
+        public static IMessageFilter<string> GetListener()
+        {
+            return new MessageFilter<string>(Listen);
+        }
+
+        /// <summary>
+        /// Combines accumulated failures and throws an assertion, if there are any.
+        /// Clears accumulated failures.
+        /// </summary>
+        public static void AssertFailures()
+        {
+            if (Failures.Any())
+                Assert.Fail(Failures.Reverse().Aggregate((x, y) => string.Format("{0}\n{1}",
x, y)));
+
+            Failures.Clear();
+        }
+
+        /// <summary>
+        /// Listen method.
+        /// </summary>
+        /// <param name="id">Originating node ID.</param>
+        /// <param name="msg">Message.</param>
+        private static bool Listen(Guid id, string msg)
+        {
+            try
+            {
+                LastNodeIds.Push(id);
+                ReceivedMessages.Push(msg);
+
+                ReceivedEvent.Signal();
+
+                return ListenResult;
+            }
+            catch (Exception ex)
+            {
+                // When executed on remote nodes, these exceptions will not go to sender,

+                // so we have to accumulate them.
+                Failures.Push(string.Format("Exception in Listen (msg: {0}, id: {1}): {2}",
msg, id, ex));
+                throw;
+            }
+        }
+    }
+
+    /// <summary>
+    /// Test message filter.
+    /// </summary>
+    [Serializable]
+    public class MessageFilter<T> : IMessageFilter<T>
+    {
+        /** */
+        private readonly Func<Guid, T, bool> _invoke;
+
+        #pragma warning disable 649
+        /** Grid. */
+        [InstanceResource]
+        private IIgnite _grid;
+        #pragma warning restore 649
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="MessageFilter{T}"/> class.
+        /// </summary>
+        /// <param name="invoke">The invoke delegate.</param>
+        public MessageFilter(Func<Guid, T, bool> invoke)
+        {
+            _invoke = invoke;
+        }
+
+        /** <inheritdoc /> */
+        public bool Invoke(Guid nodeId, T message)
+        {
+            Assert.IsNotNull(_grid);
+            return _invoke(nodeId, message);
+        }
+    }
+}


Mime
View raw message