ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [29/37] ignite git commit: IGNITE-1348: Moved GridGain's .Net module to Ignite.
Date Fri, 04 Sep 2015 13:32:01 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
new file mode 100644
index 0000000..680228d
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using System;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Datastream;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl.Cache;
+    using Apache.Ignite.Core.Impl.Datastream;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+    using Apache.Ignite.Core.Messaging;
+
+    /// <summary>
+    /// Type descriptor with precompiled delegates for known methods.
+    /// </summary>
+    internal class DelegateTypeDescriptor
+    {
+        /** Cached decriptors. */
+        private static readonly CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor> Descriptors 
+            = new CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor>();
+
+        /** */
+        private readonly Func<object, object> _computeOutFunc;
+
+        /** */
+        private readonly Func<object, object, object> _computeFunc;
+
+        /** */
+        private readonly Func<object, Guid, object, bool> _eventFilter;
+
+        /** */
+        private readonly Func<object, object, object, bool> _cacheEntryFilter;
+
+        /** */
+        private readonly Func<object, object, object, byte, bool> _cacheDrEntryFilter;
+
+        /** */
+        private readonly Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>> 
+            _cacheEntryProcessor;
+
+        /** */
+        private readonly Func<object, Guid, object, bool> _messageFilter;
+
+        /** */
+        private readonly Func<object, object> _computeJobExecute;
+
+        /** */
+        private readonly Action<object> _computeJobCancel;
+
+        /** */
+        private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _streamReceiver;
+
+        /** */
+        private readonly Func<object, object> _streamTransformerCtor;
+
+        /// <summary>
+        /// Gets the <see cref="IComputeFunc{T}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, object> GetComputeOutFunc(Type type)
+        {
+            return Get(type)._computeOutFunc;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="IComputeFunc{T, R}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, object, object> GetComputeFunc(Type type)
+        {
+            return Get(type)._computeFunc;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="IEventFilter{T}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, Guid, object, bool> GetEventFilter(Type type)
+        {
+            return Get(type)._eventFilter;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="ICacheEntryFilter{TK,TV}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, object, object, bool> GetCacheEntryFilter(Type type)
+        {
+            return Get(type)._cacheEntryFilter;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="ICacheDrEntryFilter{K, V}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, object, object, byte, bool> GetCacheDrEntryFilter(Type type)
+        {
+            return Get(type)._cacheDrEntryFilter;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="ICacheEntryProcessor{K, V, A, R}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, IMutableCacheEntryInternal, object, object> GetCacheEntryProcessor(Type type)
+        {
+            return Get(type)._cacheEntryProcessor.Item1;
+        }
+
+        /// <summary>
+        /// Gets key and value types for the <see cref="ICacheEntryProcessor{K, V, A, R}" />.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Key and value types.</returns>
+        public static Tuple<Type, Type> GetCacheEntryProcessorTypes(Type type)
+        {
+            return Get(type)._cacheEntryProcessor.Item2;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+        {
+            return Get(type)._messageFilter;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="IComputeJob{T}.Execute" /> and <see cref="IComputeJob{T}.Cancel" /> invocators.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <param name="execute">Execute invocator.</param>
+        /// <param name="cancel">Cancel invocator.</param>
+        public static void GetComputeJob(Type type, out Func<object, object> execute, out Action<object> cancel)
+        {
+            var desc = Get(type);
+
+            execute = desc._computeJobExecute;
+            cancel = desc._computeJobCancel;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="IStreamReceiver{TK,TV}"/> invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> GetStreamReceiver(Type type)
+        {
+            return Get(type)._streamReceiver;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="StreamTransformer{K, V, A, R}"/>> ctor invocator.
+        /// </summary>
+        /// <param name="type">Type.</param>
+        /// <returns>Precompiled invocator delegate.</returns>
+        public static Func<object, object> GetStreamTransformerCtor(Type type)
+        {
+            return Get(type)._streamTransformerCtor;
+        }
+
+        /// <summary>
+        /// Gets the <see cref="DelegateTypeDescriptor" /> by type.
+        /// </summary>
+        private static DelegateTypeDescriptor Get(Type type)
+        {
+            DelegateTypeDescriptor result;
+
+            return Descriptors.TryGetValue(type, out result)
+                ? result
+                : Descriptors.GetOrAdd(type, t => new DelegateTypeDescriptor(t));
+        }
+
+        /// <summary>
+        /// Throws an exception if first argument is not null.
+        /// </summary>
+        // ReSharper disable once UnusedParameter.Local
+        private static void ThrowIfMultipleInterfaces(object check, Type userType, Type interfaceType)
+        {
+            if (check != null)
+                throw new InvalidOperationException(
+                    string.Format("Not Supported: Type {0} implements interface {1} multiple times.", userType,
+                        interfaceType));
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="DelegateTypeDescriptor"/> class.
+        /// </summary>
+        /// <param name="type">The type.</param>
+        private DelegateTypeDescriptor(Type type)
+        {
+            foreach (var iface in type.GetInterfaces())
+            {
+                if (!iface.IsGenericType)
+                    continue;
+
+                var genericTypeDefinition = iface.GetGenericTypeDefinition();
+
+                if (genericTypeDefinition == typeof (IComputeFunc<>))
+                {
+                    ThrowIfMultipleInterfaces(_computeOutFunc, type, typeof(IComputeFunc<>));
+
+                    _computeOutFunc = DelegateConverter.CompileFunc(iface);
+                }
+                else if (genericTypeDefinition == typeof (IComputeFunc<,>))
+                {
+                    ThrowIfMultipleInterfaces(_computeFunc, type, typeof(IComputeFunc<,>));
+
+                    var args = iface.GetGenericArguments();
+
+                    _computeFunc = DelegateConverter.CompileFunc<Func<object, object, object>>(iface, new[] {args[0]});
+                }
+                else if (genericTypeDefinition == typeof (IEventFilter<>))
+                {
+                    ThrowIfMultipleInterfaces(_eventFilter, type, typeof(IEventFilter<>));
+
+                    var args = iface.GetGenericArguments();
+
+                    _eventFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface, 
+                        new[] {typeof (Guid), args[0]}, new[] {false, true, false});
+                }
+                else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>))
+                {
+                    ThrowIfMultipleInterfaces(_cacheEntryFilter, type, typeof(ICacheEntryFilter<,>));
+
+                    var args = iface.GetGenericArguments();
+
+                    var entryType = typeof (ICacheEntry<,>).MakeGenericType(args);
+
+                    var invokeFunc = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
+                        new[] { entryType }, new[] { true, false });
+
+                    var ctor = DelegateConverter.CompileCtor<Func<object, object, object>>(
+                            typeof (CacheEntry<,>).MakeGenericType(args), args);
+
+                    // Resulting func constructs CacheEntry and passes it to user implementation
+                    _cacheEntryFilter = (obj, k, v) => invokeFunc(obj, ctor(k, v));
+                }
+                else if (genericTypeDefinition == typeof (ICacheEntryProcessor<,,,>))
+                {
+                    ThrowIfMultipleInterfaces(_cacheEntryProcessor, type, typeof(ICacheEntryProcessor<,,,>));
+
+                    var args = iface.GetGenericArguments();
+
+                    var entryType = typeof (IMutableCacheEntry<,>).MakeGenericType(args[0], args[1]);
+
+                    var func = DelegateConverter.CompileFunc<Func<object, object, object, object>>(iface,
+                        new[] { entryType, args[2] }, null, "Process");
+
+                    var types = new Tuple<Type, Type>(args[0], args[1]);
+
+                    _cacheEntryProcessor = new Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
+                        (func, types);
+
+                    var transformerType = typeof (StreamTransformer<,,,>).MakeGenericType(args);
+
+                    _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
+                        new[] {iface});
+                }
+                else if (genericTypeDefinition == typeof (IMessageFilter<>))
+                {
+                    ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+
+                    var arg = iface.GetGenericArguments()[0];
+
+                    _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+                        new[] { typeof(Guid), arg }, new[] { false, true, false });
+                }
+                else if (genericTypeDefinition == typeof (IComputeJob<>))
+                {
+                    ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+
+                    _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0], 
+                        methodName: "Execute");
+
+                    _computeJobCancel = DelegateConverter.CompileFunc<Action<object>>(iface, new Type[0],
+                        new[] {false}, "Cancel");
+                }
+                else if (genericTypeDefinition == typeof (IStreamReceiver<,>))
+                {
+                    ThrowIfMultipleInterfaces(_streamReceiver, type, typeof (IStreamReceiver<,>));
+
+                    var method =
+                        typeof (StreamReceiverHolder).GetMethod("InvokeReceiver")
+                            .MakeGenericMethod(iface.GetGenericArguments());
+
+                    _streamReceiver = DelegateConverter
+                        .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool>>(
+                            typeof (StreamReceiverHolder),
+                            method,
+                            new[]
+                            {
+                                iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IPortableStream),
+                                typeof (bool)
+                            },
+                            new[] {true, false, false, false, false, false});
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
index c62cfd2..0bbc1a2 100644
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Common
     using System.Diagnostics.CodeAnalysis;
     using System.Threading;
     using System.Threading.Tasks;
-    
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl.Portable.IO;
 
@@ -133,7 +132,7 @@ namespace Apache.Ignite.Core.Impl.Common
         /** <inheritdoc/> */
         public void Listen(Action<IFuture<T>> callback)
         {
-            GridArgumentCheck.NotNull(callback, "callback");
+            IgniteArgumentCheck.NotNull(callback, "callback");
 
             if (!_done)
             {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
new file mode 100644
index 0000000..a07d954
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/FutureConverter.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using System;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Portable.IO;
+
+    /// <summary>
+    /// Marshals and converts future value.
+    /// </summary>
+    internal class FutureConverter<T> : IFutureConverter<T>
+    {
+        /** Marshaller. */
+        private readonly PortableMarshaller _marsh;
+
+        /** Keep portable flag. */
+        private readonly bool _keepPortable;
+
+        /** Converting function. */
+        private readonly Func<PortableReaderImpl, T> _func;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="marsh">Marshaller.</param>
+        /// <param name="keepPortable">Keep portable.</param>
+        /// <param name="func">Converting function.</param>
+        public FutureConverter(PortableMarshaller marsh, bool keepPortable,
+            Func<PortableReaderImpl, T> func = null)
+        {
+            _marsh = marsh;
+            _keepPortable = keepPortable;
+            _func = func ?? (reader => reader.ReadObject<T>());
+        }
+
+        /// <summary>
+        /// Read and convert a value.
+        /// </summary>
+        public T Convert(IPortableStream stream)
+        {
+            var reader = _marsh.StartUnmarshal(stream, _keepPortable);
+
+            return _func(reader);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
deleted file mode 100644
index a1fadfe..0000000
--- a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/GridArgumentCheck.cs
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Impl.Common
-{
-    using System;
-    using System.Collections.Generic;
-
-    /// <summary>
-    /// Arguments check helpers.
-    /// </summary>
-    public static class GridArgumentCheck
-    {
-        /// <summary>
-        /// Throws an ArgumentNullException if specified arg is null.
-        /// </summary>
-        /// <param name="arg">The argument.</param>
-        /// <param name="argName">Name of the argument.</param>
-        public static void NotNull(object arg, string argName)
-        {
-            if (arg == null)
-                throw new ArgumentNullException(argName);
-        }
-
-        /// <summary>
-        /// Throws an ArgumentException if specified arg is null or empty string.
-        /// </summary>
-        /// <param name="arg">The argument.</param>
-        /// <param name="argName">Name of the argument.</param>
-        public static void NotNullOrEmpty(string arg, string argName)
-        {
-            if (string.IsNullOrEmpty(arg))
-                throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
-                    argName);
-        }
-
-        /// <summary>
-        /// Throws an ArgumentException if specified arg is null or empty string.
-        /// </summary>
-        /// <param name="collection">The collection.</param>
-        /// <param name="argName">Name of the argument.</param>
-        public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName)
-        {
-            if (collection == null || collection.Count == 0)
-                throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
-                    argName);
-        }
-
-        /// <summary>
-        /// Throws an ArgumentException if specified condition is false.
-        /// </summary>
-        /// <param name="condition">Condition.</param>
-        /// <param name="argName">Name of the argument.</param>
-        /// <param name="message">Message.</param>
-        public static void Ensure(bool condition, string argName, string message)
-        {
-            if (!condition)
-                throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message), 
-                    argName);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
new file mode 100644
index 0000000..e94c577
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/IgniteArgumentCheck.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using System;
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// Arguments check helpers.
+    /// </summary>
+    public static class IgniteArgumentCheck
+    {
+        /// <summary>
+        /// Throws an ArgumentNullException if specified arg is null.
+        /// </summary>
+        /// <param name="arg">The argument.</param>
+        /// <param name="argName">Name of the argument.</param>
+        public static void NotNull(object arg, string argName)
+        {
+            if (arg == null)
+                throw new ArgumentNullException(argName);
+        }
+
+        /// <summary>
+        /// Throws an ArgumentException if specified arg is null or empty string.
+        /// </summary>
+        /// <param name="arg">The argument.</param>
+        /// <param name="argName">Name of the argument.</param>
+        public static void NotNullOrEmpty(string arg, string argName)
+        {
+            if (string.IsNullOrEmpty(arg))
+                throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
+                    argName);
+        }
+
+        /// <summary>
+        /// Throws an ArgumentException if specified arg is null or empty string.
+        /// </summary>
+        /// <param name="collection">The collection.</param>
+        /// <param name="argName">Name of the argument.</param>
+        public static void NotNullOrEmpty<T>(ICollection<T> collection, string argName)
+        {
+            if (collection == null || collection.Count == 0)
+                throw new ArgumentException(string.Format("'{0}' argument should not be null or empty.", argName),
+                    argName);
+        }
+
+        /// <summary>
+        /// Throws an ArgumentException if specified condition is false.
+        /// </summary>
+        /// <param name="condition">Condition.</param>
+        /// <param name="argName">Name of the argument.</param>
+        /// <param name="message">Message.</param>
+        public static void Ensure(bool condition, string argName, string message)
+        {
+            if (!condition)
+                throw new ArgumentException(string.Format("'{0}' argument is invalid: {1}", argName, message), 
+                    argName);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
new file mode 100644
index 0000000..733bed0
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Common/PortableResultWrapper.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// Simple wrapper over result to handle marshalling properly.
+    /// </summary>
+    internal class PortableResultWrapper : IPortableWriteAware
+    {
+        /** */
+        private readonly object _result;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="PortableResultWrapper"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public PortableResultWrapper(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.RawReader();
+
+            _result = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+        }
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="res">Result.</param>
+        public PortableResultWrapper(object res)
+        {
+            _result = res;
+        }
+
+        /// <summary>
+        /// Result.
+        /// </summary>
+        public object Result
+        {
+            get { return _result; }
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl) writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, Result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
new file mode 100644
index 0000000..1a772c2
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeAbstractClosureTask.cs
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Base class for all tasks working with closures.
+    /// </summary>
+    internal abstract class ComputeAbstractClosureTask<TA, T, TR> : IComputeTask<TA, T, TR>
+    {
+        /// <summary>
+        /// This method is called to map or split Ignite task into multiple Ignite jobs. This is the
+        /// first method that gets called when task execution starts.
+        /// </summary>
+        /// <param name="subgrid">Nodes available for this task execution. Note that order of nodes is
+        /// guaranteed to be randomized by container. This ensures that every time you simply iterate
+        /// through Ignite nodes, the order of nodes will be random which over time should result into
+        /// all nodes being used equally.</param>
+        /// <param name="arg">Task execution argument. Can be <c>null</c>. This is the same argument
+        /// as the one passed into <c>ICompute.Execute()</c> methods.</param>
+        /// <returns>
+        /// Map of Ignite jobs assigned to subgrid node. If <c>null</c> or empty map is returned,
+        /// exception will be thrown.
+        /// </returns>
+        /// <exception cref="System.NotSupportedException">Map step should not be called on this task.</exception>
+        public IDictionary<IComputeJob<T>, IClusterNode> Map(IList<IClusterNode> subgrid, TA arg)
+        {
+            throw new NotSupportedException("Map step should not be called on this task.");
+        }
+
+        /// <summary>
+        /// Asynchronous callback invoked every time a result from remote execution is
+        /// received. It is ultimately upto this method to return a policy based
+        /// on which the system will either wait for more results, reduce results
+        /// received so far, or failover this job to another node. See
+        /// <see cref="ComputeJobResultPolicy" /> for more information.
+        /// </summary>
+        /// <param name="res">Received remote Ignite executable result.</param>
+        /// <param name="rcvd">All previously received results. Note that if task class has
+        /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param>
+        /// <returns>
+        /// Result policy that dictates how to process further upcoming job results.
+        /// </returns>
+        public ComputeJobResultPolicy Result(IComputeJobResult<T> res, IList<IComputeJobResult<T>> rcvd)
+        {
+            Exception err = res.Exception();
+
+            if (err != null)
+            {
+                if (err is ComputeExecutionRejectedException || err is ClusterTopologyException || 
+                    err is ComputeJobFailoverException)
+                    return ComputeJobResultPolicy.Failover;
+                
+                throw err;
+            }
+            
+            return Result0(res);
+        }
+
+        /// <summary>
+        /// Reduces (or aggregates) results received so far into one compound result to be returned to
+        /// caller via future.
+        /// <para />
+        /// Note, that if some jobs did not succeed and could not be failed over then the list of
+        /// results passed into this method will include the failed results. Otherwise, failed
+        /// results will not be in the list.
+        /// </summary>
+        /// <param name="results">Received job results. Note that if task class has
+        /// <see cref="ComputeTaskNoResultCacheAttribute" /> attribute, then this list will be empty.</param>
+        /// <returns>
+        /// Task result constructed from results of remote executions.
+        /// </returns>
+        public abstract TR Reduce(IList<IComputeJobResult<T>> results);
+
+        /// <summary>
+        /// Internal result processing routine.
+        /// </summary>
+        /// <param name="res">Result.</param>
+        /// <returns>Policy.</returns>
+        protected abstract ComputeJobResultPolicy Result0(IComputeJobResult<T> res);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
new file mode 100644
index 0000000..c91a167
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeActionJob.cs
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// System job which wraps over <c>Action</c>.
+    /// </summary>
+    internal class ComputeActionJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+    {
+        /** Closure. */
+        private readonly IComputeAction _action;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="action">Action.</param>
+        public ComputeActionJob(IComputeAction action)
+        {
+            _action = action;
+        }
+
+        /** <inheritDoc /> */
+        public object Execute()
+        {
+            _action.Invoke();
+            
+            return null;
+        }
+
+        /** <inheritDoc /> */
+        public void Cancel()
+        {
+            throw new NotSupportedException("Func job cannot be cancelled.");
+        }
+
+        /** <inheritDoc /> */
+        public void Inject(Ignite grid)
+        {
+            ResourceProcessor.Inject(_action, grid);
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _action);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeActionJob"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ComputeActionJob(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.RawReader();
+
+            _action = PortableUtils.ReadPortableOrSerializable<IComputeAction>(reader0);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
new file mode 100644
index 0000000..381c701
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeFuncJob.cs
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// System job which wraps over <c>Func</c>.
+    /// </summary>
+    internal class ComputeFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+    {
+        /** Closure. */
+        private readonly IComputeFunc _clo;
+
+        /** Argument. */
+        private readonly object _arg;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="clo">Closure.</param>
+        /// <param name="arg">Argument.</param>
+        public ComputeFuncJob(IComputeFunc clo, object arg)
+        {
+            _clo = clo;
+            _arg = arg;
+        }
+
+        /** <inheritDoc /> */
+        public object Execute()
+        {
+            return _clo.Invoke(_arg);
+        }
+
+        /** <inheritDoc /> */
+        public void Cancel()
+        {
+            throw new NotSupportedException("Func job cannot be cancelled.");
+        }
+
+        /** <inheritDoc /> */
+        public void Inject(Ignite grid)
+        {
+            ResourceProcessor.Inject(_clo, grid);
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            PortableWriterImpl writer0 = (PortableWriterImpl) writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _clo);
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _arg);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeFuncJob"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ComputeFuncJob(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl) reader.RawReader();
+            
+            _clo = PortableUtils.ReadPortableOrSerializable<IComputeFunc>(reader0);
+            _arg = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
new file mode 100644
index 0000000..dd57f6c
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeMultiClosureTask.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Closure-based task producing multiple jobs and returning a collection of job results.
+    /// </summary>
+    [ComputeTaskNoResultCache]
+    internal class ComputeMultiClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> 
+        where TR : ICollection<T>
+    {
+        /** Result. */
+        private readonly ICollection<T> _res;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="size">Expected results count.</param>
+        public ComputeMultiClosureTask(int size)
+        {
+            _res = new List<T>(size);
+        }
+
+        /** <inheritDoc /> */
+        protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+        {
+            _res.Add(res.Data());
+
+            return ComputeJobResultPolicy.Wait;
+        }
+
+        /** <inheritDoc /> */
+        public override TR Reduce(IList<IComputeJobResult<T>> results)
+        {
+            return (TR) _res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
new file mode 100644
index 0000000..5f719cd
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeOutFuncJob.cs
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+
+    /// <summary>
+    /// System job which wraps over <c>Func</c>.
+    /// </summary>
+    internal class ComputeOutFuncJob : IComputeJob, IComputeResourceInjector, IPortableWriteAware
+    {
+        /** Closure. */
+        private readonly IComputeOutFunc _clo;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="clo">Closure.</param>
+        public ComputeOutFuncJob(IComputeOutFunc clo)
+        {
+            _clo = clo;
+        }
+
+        /** <inheritDoc /> */
+        public object Execute()
+        {
+            return _clo.Invoke();
+        }
+
+        /** <inheritDoc /> */
+        public void Cancel()
+        {
+            throw new NotSupportedException("Func job cannot be cancelled.");
+        }
+
+        /** <inheritDoc /> */
+        public void Inject(Ignite grid)
+        {
+            ResourceProcessor.Inject(_clo, grid);
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl) writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _clo);
+        }
+
+        public ComputeOutFuncJob(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl) reader.RawReader();
+
+            _clo = PortableUtils.ReadPortableOrSerializable<IComputeOutFunc>(reader0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
new file mode 100644
index 0000000..a84d7ce
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeReducingClosureTask.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Resource;
+
+    /// <summary>
+    /// Closure-based task producing only one job and thus having only single result.
+    /// </summary>
+    [ComputeTaskNoResultCache]
+    internal class ComputeReducingClosureTask<TA, T, TR> 
+        : ComputeAbstractClosureTask<TA, T, TR>, IComputeResourceInjector
+    {
+        /** Reducer. */
+        private readonly IComputeReducer<T, TR> _rdc;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="rdc">Reducer.</param>
+        public ComputeReducingClosureTask(IComputeReducer<T, TR> rdc)
+        {
+            _rdc = rdc;
+        }
+
+        /** <inheritDoc /> */
+        protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+        {
+            return _rdc.Collect(res.Data()) ? ComputeJobResultPolicy.Wait : ComputeJobResultPolicy.Reduce;
+        }
+
+        /** <inheritDoc /> */
+        public override TR Reduce(IList<IComputeJobResult<T>> results)
+        {
+            return _rdc.Reduce();
+        }
+
+        /** <inheritDoc /> */
+        public void Inject(Ignite grid)
+        {
+            ResourceProcessor.Inject(_rdc, grid);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
new file mode 100644
index 0000000..6e82c9b
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/ComputeSingleClosureTask.cs
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Closure-based task producing only one job and thus having only single result.
+    /// </summary>
+    [ComputeTaskNoResultCache]
+    internal class ComputeSingleClosureTask<TA, T, TR> : ComputeAbstractClosureTask<TA, T, TR> where TR : T
+    {
+        /** Result. */
+        private TR _res;
+
+        /** <inheritDoc /> */
+        protected override ComputeJobResultPolicy Result0(IComputeJobResult<T> res)
+        {
+            _res = (TR) res.Data();
+
+            // No more results are expected at this point, but we prefer not to alter regular
+            // task flow.
+            return ComputeJobResultPolicy.Wait;
+        }
+
+        /** <inheritDoc /> */
+        public override TR Reduce(IList<IComputeJobResult<T>> results)
+        {
+            return _res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
new file mode 100644
index 0000000..8d3e8d7
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Closure/IComputeResourceInjector.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute.Closure
+{
+    /// <summary>
+    /// Interface denoting entity which must perform custom resource injection.
+    /// </summary>
+    internal interface IComputeResourceInjector
+    {
+        /// <summary>
+        /// Inject resources.
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        void Inject(Ignite grid);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
new file mode 100644
index 0000000..7efabd1
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Synchronous Compute facade.
+    /// </summary>
+    internal class Compute : ICompute
+    {
+        /** */
+        private readonly ComputeImpl _compute;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Compute"/> class.
+        /// </summary>
+        /// <param name="computeImpl">The compute implementation.</param>
+        public Compute(ComputeImpl computeImpl)
+        {
+            Debug.Assert(computeImpl != null);
+
+            _compute = computeImpl;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithAsync()
+        {
+            return new ComputeAsync(_compute);
+        }
+
+        /** <inheritDoc /> */
+        public bool IsAsync
+        {
+            get { return false; }
+        }
+
+        /** <inheritDoc /> */
+        public IFuture GetFuture()
+        {
+            throw IgniteUtils.GetAsyncModeDisabledException();
+        }
+
+        /** <inheritDoc /> */
+        public IFuture<TResult> GetFuture<TResult>()
+        {
+            throw IgniteUtils.GetAsyncModeDisabledException();
+        }
+
+        /** <inheritDoc /> */
+        public IClusterGroup ClusterGroup
+        {
+            get { return _compute.ClusterGroup; }
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithNoFailover()
+        {
+            _compute.WithNoFailover();
+
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithTimeout(long timeout)
+        {
+            _compute.WithTimeout(timeout);
+
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithKeepPortable()
+        {
+            _compute.WithKeepPortable();
+
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public T ExecuteJavaTask<T>(string taskName, object taskArg)
+        {
+            return _compute.ExecuteJavaTask<T>(taskName, taskArg);
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+        {
+            return _compute.Execute(task, taskArg).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<T, TR>(IComputeTask<T, TR> task)
+        {
+            return _compute.Execute(task, null).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+        {
+            return _compute.Execute<TA, T, TR>(taskType, taskArg).Get();
+        }
+
+        public TR Execute<T, TR>(Type taskType)
+        {
+            return _compute.Execute<object, T, TR>(taskType, null).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR Call<TR>(IComputeFunc<TR> clo)
+        {
+            return _compute.Execute(clo).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+        {
+            return _compute.AffinityCall(cacheName, affinityKey, clo).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR Call<TR>(Func<TR> func)
+        {
+            return _compute.Execute(func).Get();
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+        {
+            return _compute.Execute(clos).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+        {
+            return _compute.Execute(clos, rdc).Get();
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+        {
+            return _compute.Broadcast(clo).Get();
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            return _compute.Broadcast(clo, arg).Get();
+        }
+
+        /** <inheritDoc /> */
+        public void Broadcast(IComputeAction action)
+        {
+            _compute.Broadcast(action).Get();
+        }
+
+        /** <inheritDoc /> */
+        public void Run(IComputeAction action)
+        {
+            _compute.Run(action).Get();
+        }
+
+        /** <inheritDoc /> */
+        public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+        {
+            _compute.AffinityRun(cacheName, affinityKey, action).Get();
+        }
+
+        /** <inheritDoc /> */
+        public void Run(IEnumerable<IComputeAction> actions)
+        {
+            _compute.Run(actions).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            return _compute.Apply(clo, arg).Get();
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+        {
+            return _compute.Apply(clo, args).Get();
+        }
+
+        /** <inheritDoc /> */
+        public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+        {
+            return _compute.Apply(clo, args, rdc).Get();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
new file mode 100644
index 0000000..199afc2
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics.CodeAnalysis;
+    using System.Threading;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Compute;
+
+    /// <summary>
+    /// Asynchronous Compute facade.
+    /// </summary>
+    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
+    internal class ComputeAsync : ICompute
+    {
+        /** */
+        protected readonly ComputeImpl Compute;
+
+        /** Current future. */
+        private readonly ThreadLocal<IFuture> _curFut = new ThreadLocal<IFuture>();
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeAsync"/> class.
+        /// </summary>
+        /// <param name="computeImpl">The compute implementation.</param>
+        internal ComputeAsync(ComputeImpl computeImpl)
+        {
+            Compute = computeImpl;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithAsync()
+        {
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public bool IsAsync
+        {
+            get { return true; }
+        }
+
+        /** <inheritDoc /> */
+        public IFuture GetFuture()
+        {
+            return GetFuture<object>();
+        }
+
+        /** <inheritDoc /> */
+        public IFuture<TResult> GetFuture<TResult>()
+        {
+            var fut = _curFut.Value;
+
+            if (fut == null)
+                throw new InvalidOperationException("Asynchronous operation not started.");
+
+            var fut0 = fut as IFuture<TResult>;
+
+            if (fut0 == null)
+                throw new InvalidOperationException(
+                    string.Format("Requested future type {0} is incompatible with current future type {1}",
+                        typeof(IFuture<TResult>), fut.GetType()));
+
+            _curFut.Value = null;
+
+            return fut0;
+        }
+
+        /** <inheritDoc /> */
+        public IClusterGroup ClusterGroup
+        {
+            get { return Compute.ClusterGroup; }
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithNoFailover()
+        {
+            Compute.WithNoFailover();
+
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithTimeout(long timeout)
+        {
+            Compute.WithTimeout(timeout);
+
+            return this;
+        }
+
+        /** <inheritDoc /> */
+        public ICompute WithKeepPortable()
+        {
+            Compute.WithKeepPortable();
+
+            return this;
+        }
+        
+        /** <inheritDoc /> */
+        public T ExecuteJavaTask<T>(string taskName, object taskArg)
+        {
+            _curFut.Value = Compute.ExecuteJavaTaskAsync<T>(taskName, taskArg);
+
+            return default(T);
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<TA, T, TR>(IComputeTask<TA, T, TR> task, TA taskArg)
+        {
+            _curFut.Value = Compute.Execute(task, taskArg);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<T, TR>(IComputeTask<T, TR> task)
+        {
+            _curFut.Value = Compute.Execute(task, null);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<TA, T, TR>(Type taskType, TA taskArg)
+        {
+            _curFut.Value = Compute.Execute<TA, T, TR>(taskType, taskArg);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR Execute<T, TR>(Type taskType)
+        {
+            _curFut.Value = Compute.Execute<object, T, TR>(taskType, null);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR Call<TR>(IComputeFunc<TR> clo)
+        {
+            _curFut.Value = Compute.Execute(clo);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR AffinityCall<TR>(string cacheName, object affinityKey, IComputeFunc<TR> clo)
+        {
+            Compute.AffinityCall(cacheName, affinityKey, clo);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public TR Call<TR>(Func<TR> func)
+        {
+            _curFut.Value = Compute.Execute(func);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Call<TR>(IEnumerable<IComputeFunc<TR>> clos)
+        {
+            _curFut.Value = Compute.Execute(clos);
+
+            return null;
+        }
+
+        /** <inheritDoc /> */
+        public TR2 Call<TR1, TR2>(IEnumerable<IComputeFunc<TR1>> clos, IComputeReducer<TR1, TR2> rdc)
+        {
+            _curFut.Value = Compute.Execute(clos, rdc);
+
+            return default(TR2);
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Broadcast<TR>(IComputeFunc<TR> clo)
+        {
+            _curFut.Value = Compute.Broadcast(clo);
+
+            return null;
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Broadcast<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            _curFut.Value = Compute.Broadcast(clo, arg);
+
+            return null;
+        }
+
+        /** <inheritDoc /> */
+        public void Broadcast(IComputeAction action)
+        {
+            _curFut.Value = Compute.Broadcast(action);
+        }
+
+        /** <inheritDoc /> */
+        public void Run(IComputeAction action)
+        {
+            _curFut.Value = Compute.Run(action);
+        }
+
+        /** <inheritDoc /> */
+        public void AffinityRun(string cacheName, object affinityKey, IComputeAction action)
+        {
+            Compute.AffinityRun(cacheName, affinityKey, action);
+        }
+
+        /** <inheritDoc /> */
+        public void Run(IEnumerable<IComputeAction> actions)
+        {
+            _curFut.Value = Compute.Run(actions);
+        }
+
+        /** <inheritDoc /> */
+        public TR Apply<T, TR>(IComputeFunc<T, TR> clo, T arg)
+        {
+            _curFut.Value = Compute.Apply(clo, arg);
+
+            return default(TR);
+        }
+
+        /** <inheritDoc /> */
+        public ICollection<TR> Apply<T, TR>(IComputeFunc<T, TR> clo, IEnumerable<T> args)
+        {
+            _curFut.Value = Compute.Apply(clo, args);
+
+            return null;
+        }
+
+        /** <inheritDoc /> */
+        public TR2 Apply<T, TR1, TR2>(IComputeFunc<T, TR1> clo, IEnumerable<T> args, IComputeReducer<TR1, TR2> rdc)
+        {
+            _curFut.Value = Compute.Apply(clo, args, rdc);
+
+            return default(TR2);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
new file mode 100644
index 0000000..a971418
--- /dev/null
+++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeFunc.cs
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Compute
+{
+    using System;
+    using System.Reflection;
+    using Apache.Ignite.Core.Compute;
+    using Apache.Ignite.Core.Impl.Common;
+    using Apache.Ignite.Core.Impl.Portable;
+    using Apache.Ignite.Core.Impl.Resource;
+    using Apache.Ignite.Core.Portable;
+    using Apache.Ignite.Core.Resource;
+
+    /// <summary>
+    /// Non-generic version of IComputeFunc{T}.
+    /// </summary>
+    internal interface IComputeFunc : IComputeFunc<object, object>
+    {
+        // No-op
+    }
+
+    /// <summary>
+    /// Wraps generic func into a non-generic for internal usage.
+    /// </summary>
+    internal class ComputeFuncWrapper : IComputeFunc, IPortableWriteAware
+    {
+        /** */
+        private readonly object _func;
+
+        /** */
+        private readonly Func<object, object, object> _invoker;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeFuncWrapper" /> class.
+        /// </summary>
+        /// <param name="func">The function to wrap.</param>
+        /// <param name="invoker">The function invoker.</param>
+        public ComputeFuncWrapper(object func, Func<object, object> invoker)
+        {
+            _func = func;
+
+            _invoker = (target, arg) => invoker(arg);
+        }
+
+        /** <inheritDoc /> */
+        public object Invoke(object arg)
+        {
+            try
+            {
+                return _invoker(_func, arg);
+            }
+            catch (TargetInvocationException ex)
+            {
+                throw ex.InnerException;
+            }
+        }
+
+        /** <inheritDoc /> */
+        public void WritePortable(IPortableWriter writer)
+        {
+            var writer0 = (PortableWriterImpl)writer.RawWriter();
+
+            writer0.DetachNext();
+            PortableUtils.WritePortableOrSerializable(writer0, _func);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ComputeFuncWrapper"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ComputeFuncWrapper(IPortableReader reader)
+        {
+            var reader0 = (PortableReaderImpl)reader.RawReader();
+
+            _func = PortableUtils.ReadPortableOrSerializable<object>(reader0);
+
+            _invoker = DelegateTypeDescriptor.GetComputeFunc(_func.GetType());
+        }
+
+        /// <summary>
+        /// Injects the Ignite instance.
+        /// </summary>
+        [InstanceResource]
+        public void InjectIgnite(IIgnite ignite)
+        {
+            // Propagate injection
+            ResourceProcessor.Inject(_func, (IgniteProxy) ignite);
+        }
+    }    
+    
+    /// <summary>
+    /// Extension methods for IComputeFunc{T}.
+    /// </summary>
+    internal static class ComputeFuncExtensions
+    {
+        /// <summary>
+        /// Convert to non-generic wrapper.
+        /// </summary>
+        public static IComputeFunc ToNonGeneric<T, TR>(this IComputeFunc<T, TR> func)
+        {
+            return new ComputeFuncWrapper(func, x => func.Invoke((T) x));
+        }
+    }
+}


Mime
View raw message