ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [38/46] ignite git commit: IGNITE-3511: .NET: Fixed AffinityFunctionBase.cs placement, added missing Package-Info.cs files.
Date Wed, 20 Jul 2016 09:07:23 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
new file mode 100644
index 0000000..b6c190c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache.Affinity.Fair;
+    using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+    using Apache.Ignite.Core.Cluster;
+
+    /// <summary>
+    /// Represents a function that maps cache keys to cluster nodes.
+    /// <para />
+    /// Predefined implementations: 
+    /// <see cref="RendezvousAffinityFunction"/>, <see cref="FairAffinityFunction"/>.
+    /// </summary>
+    public interface IAffinityFunction
+    {
+        /// <summary>
+        /// Gets the total number of partitions.
+        /// <para />
+        /// All caches should always provide correct partition count which should be the same on all 
+        /// participating nodes. Note that partitions should always be numbered from 0 inclusively 
+        /// to N exclusively without any gaps.
+        /// </summary>
+        int Partitions { get; }
+
+        /// <summary>
+        /// Gets partition number for a given key starting from 0. Partitioned caches
+        /// should make sure that keys are about evenly distributed across all partitions
+        /// from 0 to <see cref="Partitions"/> for best performance.
+        /// <para />
+        /// Note that for fully replicated caches it is possible to segment key sets among different
+        /// grid node groups. In that case each node group should return a unique partition
+        /// number. However, unlike partitioned cache, mappings of keys to nodes in
+        /// replicated caches are constant and a node cannot migrate from one partition
+        /// to another.
+        /// </summary>
+        /// <param name="key">Key to get partition for.</param>
+        /// <returns>Partition number for a given key.</returns>
+        int GetPartition(object key);
+
+        /// <summary>
+        /// Removes node from affinity. This method is called when it is safe to remove 
+        /// disconnected node from affinity mapping.
+        /// </summary>
+        /// <param name="nodeId">The node identifier.</param>
+        void RemoveNode(Guid nodeId);
+
+        /// <summary>
+        /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+        /// nodes are updated in the same manner. In case of partitioned cache, the returned
+        /// list should contain only the primary and back up nodes with primary node being
+        /// always first.
+        /// <pare />
+        /// Note that partitioned affinity must obey the following contract: given that node
+        /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+        /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+        /// </summary>
+        /// <param name="context">The affinity function context.</param>
+        /// <returns>
+        /// A collection of partitions, where each partition is a collection of nodes,
+        /// where first node is a primary node, and other nodes are backup nodes.
+        /// </returns>
+        IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Package-Info.cs
new file mode 100644
index 0000000..dfbdf08
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// Cache affinity API.
+/// </summary>
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/Package-Info.cs
new file mode 100644
index 0000000..4b7f9af
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// Rendezvous affinity API.
+/// </summary>
+namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
new file mode 100644
index 0000000..928324c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous
+{
+    using System;
+    using Apache.Ignite.Core.Impl.Cache.Affinity;
+
+    /// <summary>
+    /// Affinity function for partitioned cache based on Highest Random Weight algorithm.
+    /// </summary>
+    [Serializable]
+    public class RendezvousAffinityFunction : AffinityFunctionBase
+    {
+        // No-op.
+        // Actual implementation is in Java, see AffinityFunctionSerializer.Write method.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/Package-Info.cs
new file mode 100644
index 0000000..fb8ed61
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// Ignite Configuration API.
+/// </summary>
+namespace Apache.Ignite.Core.Cache.Configuration
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Eviction/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Eviction/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Eviction/Package-Info.cs
new file mode 100644
index 0000000..13f56a8
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Eviction/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// Eviction API.
+/// </summary>
+namespace Apache.Ignite.Core.Cache.Eviction
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Package-Info.cs
new file mode 100644
index 0000000..b504809
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// Communication API.
+/// </summary>
+namespace Apache.Ignite.Core.Communication
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/Package-Info.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/Package-Info.cs
new file mode 100644
index 0000000..b33e01f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/Package-Info.cs
@@ -0,0 +1,26 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#pragma warning disable 1587   // invalid XML comment
+
+/// <summary>
+/// TCP communication API.
+/// </summary>
+namespace Apache.Ignite.Core.Communication.Tcp
+{
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
index cb1c715..ee1c837 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
@@ -32,16 +32,14 @@ namespace Apache.Ignite.Core.Events
         /// <param name="reader">Reader.</param>
         /// <returns>Deserialized event.</returns>
         /// <exception cref="System.InvalidCastException">Incompatible event type.</exception>
-        public static T Read<T>(IBinaryReader reader) where T : IEvent
+        public static T Read<T>(IBinaryRawReader reader) where T : IEvent
         {
-            var r = reader.GetRawReader();
-
-            var clsId = r.ReadInt();
+            var clsId = reader.ReadInt();
 
             if (clsId == -1)
                 return default(T);
 
-            return (T) CreateInstance(clsId, r);
+            return (T) CreateInstance(clsId, reader);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index c551735..ee6b1ce 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -27,10 +27,12 @@ namespace Apache.Ignite.Core
     using System.Runtime;
     using System.Threading;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
+    using Apache.Ignite.Core.Impl.Cache.Affinity;
     using Apache.Ignite.Core.Impl.Common;
     using Apache.Ignite.Core.Impl.Handle;
     using Apache.Ignite.Core.Impl.Memory;
@@ -261,6 +263,8 @@ namespace Apache.Ignite.Core
                 PrepareConfiguration(reader, outStream);
 
                 PrepareLifecycleBeans(reader, outStream, handleRegistry);
+
+                PrepareAffinityFunctions(reader, outStream);
             }
             catch (Exception e)
             {
@@ -314,7 +318,7 @@ namespace Apache.Ignite.Core
             int cnt = reader.ReadInt();
 
             for (int i = 0; i < cnt; i++)
-                beans.Add(new LifecycleBeanHolder(CreateLifecycleBean(reader)));
+                beans.Add(new LifecycleBeanHolder(CreateObject<ILifecycleBean>(reader)));
 
             // 2. Append beans definied in local configuration.
             ICollection<ILifecycleBean> nativeBeans = _startup.Configuration.LifecycleBeans;
@@ -338,21 +342,33 @@ namespace Apache.Ignite.Core
         }
 
         /// <summary>
-        /// Create lifecycle bean.
+        /// Prepares the affinity functions.
         /// </summary>
-        /// <param name="reader">Reader.</param>
-        /// <returns>Lifecycle bean.</returns>
-        private static ILifecycleBean CreateLifecycleBean(BinaryReader reader)
+        private static void PrepareAffinityFunctions(BinaryReader reader, PlatformMemoryStream outStream)
         {
-            // 1. Instantiate.
-            var bean = IgniteUtils.CreateInstance<ILifecycleBean>(reader.ReadString());
+            var cnt = reader.ReadInt();
+
+            var writer = reader.Marshaller.StartMarshal(outStream);
 
-            // 2. Set properties.
-            var props = reader.ReadDictionaryAsGeneric<string, object>();
+            for (var i = 0; i < cnt; i++)
+            {
+                var objHolder = new ObjectInfoHolder(reader);
+                AffinityFunctionSerializer.Write(writer, objHolder.CreateInstance<IAffinityFunction>(), objHolder);
+            }
+        }
+
+        /// <summary>
+        /// Creates an object and sets the properties.
+        /// </summary>
+        /// <param name="reader">Reader.</param>
+        /// <returns>Resulting object.</returns>
+        private static T CreateObject<T>(IBinaryRawReader reader)
+        {
+            var res =  IgniteUtils.CreateInstance<T>(reader.ReadString());
 
-            IgniteUtils.SetProperties(bean, props);
+            IgniteUtils.SetProperties(res, reader.ReadDictionaryAsGeneric<string, object>());
 
-            return bean;
+            return res;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
index 87de0eb..77fd7b6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl.Binary
     using System;
     using System.Collections.Generic;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Common;
 
     /// <summary>
     /// Reader extensions.
@@ -75,7 +76,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             return reader.ReadBoolean() ? reader.ReadLongAsTimespan() : (TimeSpan?) null;
         }
-        
+
         /// <summary>
         /// Reads the nullable int.
         /// </summary>
@@ -91,5 +92,18 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             return reader.ReadBoolean() ? reader.ReadBoolean() : (bool?) null;
         }
+
+        /// <summary>
+        /// Reads the object either as a normal object or as a [typeName+props] wrapper.
+        /// </summary>
+        public static T ReadObjectEx<T>(this IBinaryRawReader reader)
+        {
+            var obj = reader.ReadObject<object>();
+
+            if (obj == null)
+                return default(T);
+
+            return obj is T ? (T)obj : ((ObjectInfoHolder)obj).CreateInstance<T>();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index aa881bb..473fdf8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -576,6 +576,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
             AddSystemType(0, w => new AffinityKey(w), "affKey");
             AddSystemType(BinaryUtils.TypePlatformJavaObjectFactoryProxy, w => new PlatformJavaObjectFactoryProxy());
+            AddSystemType(0, w => new ObjectInfoHolder(w));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionBase.cs
new file mode 100644
index 0000000..8536e4c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionBase.cs
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using System.ComponentModel;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+
+    /// <summary>
+    /// Base class for predefined affinity functions.
+    /// </summary>
+    [Serializable]
+    public abstract class AffinityFunctionBase : IAffinityFunction
+    {
+        /// <summary> The default value for <see cref="Partitions"/> property. </summary>
+        public const int DefaultPartitions = 1024;
+
+        /** */
+        private int _partitions = DefaultPartitions;
+
+        /** */
+        private IAffinityFunction _baseFunction;
+
+
+        /// <summary>
+        /// Gets or sets the total number of partitions.
+        /// </summary>
+        [DefaultValue(DefaultPartitions)]
+        public virtual int Partitions
+        {
+            get { return _partitions; }
+            set { _partitions = value; }
+        }
+
+        /// <summary>
+        /// Gets partition number for a given key starting from 0. Partitioned caches
+        /// should make sure that keys are about evenly distributed across all partitions
+        /// from 0 to <see cref="Partitions" /> for best performance.
+        /// <para />
+        /// Note that for fully replicated caches it is possible to segment key sets among different
+        /// grid node groups. In that case each node group should return a unique partition
+        /// number. However, unlike partitioned cache, mappings of keys to nodes in
+        /// replicated caches are constant and a node cannot migrate from one partition
+        /// to another.
+        /// </summary>
+        /// <param name="key">Key to get partition for.</param>
+        /// <returns>
+        /// Partition number for a given key.
+        /// </returns>
+        public virtual int GetPartition(object key)
+        {
+            ThrowIfUninitialized();
+
+            return _baseFunction.GetPartition(key);
+        }
+
+        /// <summary>
+        /// Removes node from affinity. This method is called when it is safe to remove
+        /// disconnected node from affinity mapping.
+        /// </summary>
+        /// <param name="nodeId">The node identifier.</param>
+        public virtual void RemoveNode(Guid nodeId)
+        {
+            ThrowIfUninitialized();
+
+            _baseFunction.RemoveNode(nodeId);
+        }
+
+        /// <summary>
+        /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+        /// nodes are updated in the same manner. In case of partitioned cache, the returned
+        /// list should contain only the primary and back up nodes with primary node being
+        /// always first.
+        /// <pare />
+        /// Note that partitioned affinity must obey the following contract: given that node
+        /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+        /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+        /// </summary>
+        /// <param name="context">The affinity function context.</param>
+        /// <returns>
+        /// A collection of partitions, where each partition is a collection of nodes,
+        /// where first node is a primary node, and other nodes are backup nodes.
+        /// </returns>
+        public virtual IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+        {
+            ThrowIfUninitialized();
+
+            return _baseFunction.AssignPartitions(context);
+        }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other.
+        /// </summary>
+        public virtual bool ExcludeNeighbors { get; set; }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AffinityFunctionBase"/> class.
+        /// </summary>
+        internal AffinityFunctionBase()
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Sets the base function.
+        /// </summary>
+        /// <param name="baseFunc">The base function.</param>
+        internal void SetBaseFunction(IAffinityFunction baseFunc)
+        {
+            _baseFunction = baseFunc;
+        }
+
+        /// <summary>
+        /// Gets the direct usage error.
+        /// </summary>
+        private void ThrowIfUninitialized()
+        {
+            if (_baseFunction == null)
+                throw new IgniteException(GetType() + " has not yet been initialized.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
new file mode 100644
index 0000000..888445a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using System.Linq;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cache.Affinity.Fair;
+    using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl.Binary;
+    using Apache.Ignite.Core.Impl.Binary.IO;
+    using Apache.Ignite.Core.Impl.Memory;
+
+    /// <summary>
+    /// Affinity function read/write methods.
+    /// </summary>
+    internal static class AffinityFunctionSerializer
+    {
+        /** */
+        private const byte TypeCodeNull = 0;
+
+        /** */
+        private const byte TypeCodeFair = 1;
+
+        /** */
+        private const byte TypeCodeRendezvous = 2;
+
+        /** */
+        private const byte TypeCodeUser = 3;
+
+        /// <summary>
+        /// Writes the instance.
+        /// </summary>
+        internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun, object userFuncOverride = null)
+        {
+            Debug.Assert(writer != null);
+
+            if (fun == null)
+            {
+                writer.WriteByte(TypeCodeNull);
+                return;
+            }
+
+            // 1) Type code
+            // 2) Partitions
+            // 3) ExcludeNeighbors
+            // 4) Override flags
+            // 5) User object
+
+            var p = fun as AffinityFunctionBase;
+
+            if (p != null)
+            {
+                writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
+                writer.WriteInt(p.Partitions);
+                writer.WriteBoolean(p.ExcludeNeighbors);
+
+                var overrideFlags = GetOverrideFlags(p.GetType());
+                writer.WriteByte((byte) overrideFlags);
+
+                // Do not write user func if there is nothing overridden
+                WriteUserFunc(writer, overrideFlags != UserOverrides.None ? fun : null, userFuncOverride);
+            }
+            else
+            {
+                writer.WriteByte(TypeCodeUser);
+                writer.WriteInt(fun.Partitions);
+                writer.WriteBoolean(false); // Exclude neighbors
+                writer.WriteByte((byte) UserOverrides.All);
+                WriteUserFunc(writer, fun, userFuncOverride);
+            }
+        }
+
+        /// <summary>
+        /// Reads the instance.
+        /// </summary>
+        internal static IAffinityFunction Read(IBinaryRawReader reader)
+        {
+            Debug.Assert(reader != null);
+
+            var typeCode = reader.ReadByte();
+
+            if (typeCode == TypeCodeNull)
+                return null;
+
+            var partitions = reader.ReadInt();
+            var exclNeighbors = reader.ReadBoolean();
+            var overrideFlags = (UserOverrides)reader.ReadByte();
+            var userFunc = reader.ReadObjectEx<IAffinityFunction>();
+
+            if (userFunc != null)
+            {
+                Debug.Assert(overrideFlags != UserOverrides.None);
+
+                var fair = userFunc as FairAffinityFunction;
+                if (fair != null)
+                {
+                    fair.Partitions = partitions;
+                    fair.ExcludeNeighbors = exclNeighbors;
+                }
+
+                var rendezvous = userFunc as RendezvousAffinityFunction;
+                if (rendezvous != null)
+                {
+                    rendezvous.Partitions = partitions;
+                    rendezvous.ExcludeNeighbors = exclNeighbors;
+                }
+
+                return userFunc;
+            }
+
+            Debug.Assert(overrideFlags == UserOverrides.None);
+            AffinityFunctionBase fun;
+
+            switch (typeCode)
+            {
+                case TypeCodeFair:
+                    fun = new FairAffinityFunction();
+                    break;
+                case TypeCodeRendezvous:
+                    fun = new RendezvousAffinityFunction();
+                    break;
+                default:
+                    throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode);
+            }
+
+            fun.Partitions = partitions;
+            fun.ExcludeNeighbors = exclNeighbors;
+
+            return fun;
+        }
+
+
+        /// <summary>
+        /// Writes the partitions assignment to a stream.
+        /// </summary>
+        /// <param name="parts">The parts.</param>
+        /// <param name="stream">The stream.</param>
+        /// <param name="marsh">The marshaller.</param>
+        internal static void WritePartitions(IEnumerable<IEnumerable<IClusterNode>> parts,
+            PlatformMemoryStream stream, Marshaller marsh)
+        {
+            Debug.Assert(parts != null);
+            Debug.Assert(stream != null);
+            Debug.Assert(marsh != null);
+
+            IBinaryRawWriter writer = marsh.StartMarshal(stream);
+
+            var partCnt = 0;
+            writer.WriteInt(partCnt); // reserve size
+
+            foreach (var part in parts)
+            {
+                if (part == null)
+                    throw new IgniteException("IAffinityFunction.AssignPartitions() returned invalid partition: null");
+
+                partCnt++;
+
+                var nodeCnt = 0;
+                var cntPos = stream.Position;
+                writer.WriteInt(nodeCnt); // reserve size
+
+                foreach (var node in part)
+                {
+                    nodeCnt++;
+                    writer.WriteGuid(node.Id);
+                }
+
+                var endPos = stream.Position;
+                stream.Seek(cntPos, SeekOrigin.Begin);
+                stream.WriteInt(nodeCnt);
+                stream.Seek(endPos, SeekOrigin.Begin);
+            }
+
+            stream.SynchronizeOutput();
+            stream.Seek(0, SeekOrigin.Begin);
+            writer.WriteInt(partCnt);
+        }
+
+        /// <summary>
+        /// Reads the partitions assignment from a stream.
+        /// </summary>
+        /// <param name="stream">The stream.</param>
+        /// <param name="marsh">The marshaller.</param>
+        /// <returns>Partitions assignment.</returns>
+        internal static IEnumerable<IEnumerable<IClusterNode>> ReadPartitions(IBinaryStream stream, Marshaller marsh)
+        {
+            Debug.Assert(stream != null);
+            Debug.Assert(marsh != null);
+
+            IBinaryRawReader reader = marsh.StartUnmarshal(stream);
+
+            var partCnt = reader.ReadInt();
+
+            var res = new List<IEnumerable<IClusterNode>>(partCnt);
+
+            for (var i = 0; i < partCnt; i++)
+                res.Add(IgniteUtils.ReadNodes(reader));
+
+            return res;
+        }
+
+        /// <summary>
+        /// Gets the override flags.
+        /// </summary>
+        private static UserOverrides GetOverrideFlags(Type funcType)
+        {
+            var res = UserOverrides.None;
+
+            var methods = new[] {UserOverrides.GetPartition, UserOverrides.AssignPartitions, UserOverrides.RemoveNode};
+
+            var map = funcType.GetInterfaceMap(typeof(IAffinityFunction));
+
+            foreach (var method in methods)
+            {
+                // Find whether user type overrides IAffinityFunction method from AffinityFunctionBase.
+                var methodName = method.ToString();
+
+                if (map.TargetMethods.Single(x => x.Name == methodName).DeclaringType != typeof(AffinityFunctionBase))
+                    res |= method;
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Writes the user function.
+        /// </summary>
+        private static void WriteUserFunc(IBinaryRawWriter writer, IAffinityFunction func, object funcOverride)
+        {
+            if (funcOverride != null)
+            {
+                writer.WriteObject(funcOverride);
+                return;
+            }
+
+            if (func != null && !func.GetType().IsSerializable)
+                throw new IgniteException("AffinityFunction should be serializable.");
+
+            writer.WriteObject(func);
+        }
+
+        /// <summary>
+        /// Overridden function flags.
+        /// </summary>
+        [Flags]
+        private enum UserOverrides : byte
+        {
+            None = 0,
+            GetPartition = 1,
+            RemoveNode = 1 << 1,
+            AssignPartitions = 1 << 2,
+            All = GetPartition | RemoveNode | AssignPartitions
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
new file mode 100644
index 0000000..d335804
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Impl.Binary;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+
+    /// <summary>
+    /// Affinity function that delegates to Java.
+    /// </summary>
+    internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction
+    {
+        /** Opcodes. */
+        private enum  Op
+        {
+            Partition = 1,
+            RemoveNode = 2,
+            AssignPartitions = 3
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+        {
+            // No-op.
+        }
+
+        /** <inheritdoc /> */
+        public int Partitions
+        {
+            get { throw new NotSupportedException("PlatformAffinityFunction.Partitions is not supported."); }
+        }
+
+        /** <inheritdoc /> */
+        public int GetPartition(object key)
+        {
+            return (int) DoOutOp((int) Op.Partition, w => w.WriteObject(key));
+        }
+
+        /** <inheritdoc /> */
+        public void RemoveNode(Guid nodeId)
+        {
+            DoOutOp((int) Op.RemoveNode, w => w.WriteGuid(nodeId));
+        }
+
+        /** <inheritdoc /> */
+        public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+        {
+            return DoInOp((int) Op.AssignPartitions, s => AffinityFunctionSerializer.ReadPartitions(s, Marshaller));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
new file mode 100644
index 0000000..407fe0c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Binary;
+
+    /// <summary>
+    /// Holds the information to instantiate an object and set its properties.
+    /// Typically used for .NET objects defined in Spring XML.
+    /// </summary>
+    internal class ObjectInfoHolder : IBinaryWriteAware
+    {
+        /** Type name. */
+        private readonly string _typeName;
+
+        /** Properties. */
+        private readonly Dictionary<string, object> _properties;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ObjectInfoHolder"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ObjectInfoHolder(IBinaryRawReader reader)
+        {
+            Debug.Assert(reader != null);
+
+            _typeName = reader.ReadString();
+            _properties = reader.ReadDictionaryAsGeneric<string, object>();
+
+            Debug.Assert(!string.IsNullOrEmpty(_typeName));
+        }
+
+        /// <summary>
+        /// Gets the name of the type.
+        /// </summary>
+        public string TypeName
+        {
+            get { return _typeName; }
+        }
+
+        /// <summary>
+        /// Gets the properties.
+        /// </summary>
+        public Dictionary<string, object> Properties
+        {
+            get { return _properties; }
+        }
+
+        /// <summary>
+        /// Creates an instance according to type name and properties.
+        /// </summary>
+        public T CreateInstance<T>()
+        {
+            return IgniteUtils.CreateInstance<T>(TypeName, Properties);
+        }
+
+        /** <inheritdoc /> */
+        public void WriteBinary(IBinaryWriter writer)
+        {
+            Debug.Assert(writer != null);
+
+            var w = writer.GetRawWriter();
+
+            w.WriteString(_typeName);
+            w.WriteDictionary(_properties);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index 5c19802..e43e332 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -139,8 +139,9 @@ namespace Apache.Ignite.Core.Impl
         /// Create new instance of specified class.
         /// </summary>
         /// <param name="typeName">Class name</param>
+        /// <param name="props">Properties to set.</param>
         /// <returns>New Instance.</returns>
-        public static T CreateInstance<T>(string typeName)
+        public static T CreateInstance<T>(string typeName, IEnumerable<KeyValuePair<string, object>> props = null)
         {
             IgniteArgumentCheck.NotNullOrEmpty(typeName, "typeName");
 
@@ -149,7 +150,12 @@ namespace Apache.Ignite.Core.Impl
             if (type == null)
                 throw new IgniteException("Failed to create class instance [className=" + typeName + ']');
 
-            return (T) Activator.CreateInstance(type);
+            var res = (T)Activator.CreateInstance(type);
+
+            if (props != null)
+                SetProperties(res, props);
+
+            return res;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
index fb52033..51d9c74 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
@@ -98,5 +98,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         internal void* onClientDisconnected;
         internal void* ocClientReconnected;
+
+        internal void* affinityFunctionInit;
+        internal void* affinityFunctionPartition;
+        internal void* affinityFunctionAssignPartitions;
+        internal void* affinityFunctionRemoveNode;
+        internal void* affinityFunctionDestroy;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index f0e881c..b3489a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -21,14 +21,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
+    using System.IO;
     using System.Runtime.InteropServices;
     using System.Threading;
-
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Cache;
+    using Apache.Ignite.Core.Impl.Cache.Affinity;
     using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
     using Apache.Ignite.Core.Impl.Cache.Store;
     using Apache.Ignite.Core.Impl.Common;
@@ -165,6 +167,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         private delegate void OnClientDisconnectedDelegate(void* target);
         private delegate void OnClientReconnectedDelegate(void* target, bool clusterRestarted);
 
+        private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc);
+        private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr);
+        private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr);
+        private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr);
+        private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr);
+
         /// <summary>
         /// constructor.
         /// </summary>
@@ -248,6 +256,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
                 onClientDisconnected = CreateFunctionPointer((OnClientDisconnectedDelegate)OnClientDisconnected),
                 ocClientReconnected = CreateFunctionPointer((OnClientReconnectedDelegate)OnClientReconnected),
+
+                affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit),
+                affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition),
+                affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions),
+                affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode),
+                affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy)
             };
 
             _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
@@ -612,7 +626,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         }
 
         [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
-        private void DataStreamerStreamReceiverInvoke(void* target, long rcvPtr, void* cache, long memPtr, 
+        private void DataStreamerStreamReceiverInvoke(void* target, long rcvPtr, void* cache, long memPtr,
             byte keepBinary)
         {
             SafeCall(() =>
@@ -1089,6 +1103,119 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         #endregion
 
+        #region AffinityFunction
+
+        private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc)
+        {
+            return SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+                    var func = reader.ReadObjectEx<IAffinityFunction>();
+
+                    ResourceProcessor.Inject(func, _ignite);
+
+                    var affBase = func as AffinityFunctionBase;
+
+                    if (affBase != null)
+                        affBase.SetBaseFunction(new PlatformAffinityFunction(
+                            _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller));
+
+                    return _handleRegistry.Allocate(func);
+                }
+            });
+        }
+
+        private int AffinityFunctionPartition(void* target, long ptr, long memPtr)
+        {
+            return SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var key = _ignite.Marshaller.Unmarshal<object>(stream);
+
+                    return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key);
+                }
+            });
+        }
+
+        private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr)
+        {
+            SafeCall(() =>
+            {
+                using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
+                {
+                    var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream));
+                    var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
+                    var parts = func.AssignPartitions(ctx);
+
+                    if (parts == null)
+                        throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null");
+
+                    using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
+                    {
+                        var writer = _ignite.Marshaller.StartMarshal(outStream);
+
+                        var partCnt = 0;
+                        writer.WriteInt(partCnt);  // reserve size
+
+                        foreach (var part in parts)
+                        {
+                            if (part == null)
+                                throw new IgniteException(func.GetType() +
+                                                          ".AssignPartitions() returned invalid partition: null");
+
+                            partCnt++;
+
+                            var nodeCnt = 0;
+                            var cntPos = outStream.Position;
+                            writer.WriteInt(nodeCnt);  // reserve size
+
+                            foreach (var node in part)
+                            {
+                                nodeCnt++;
+                                writer.WriteGuid(node.Id);
+                            }
+
+                            var endPos = outStream.Position;
+                            outStream.Seek(cntPos, SeekOrigin.Begin);
+                            outStream.WriteInt(nodeCnt);
+                            outStream.Seek(endPos, SeekOrigin.Begin);
+                        }
+
+                        outStream.SynchronizeOutput();
+                        outStream.Seek(0, SeekOrigin.Begin);
+                        writer.WriteInt(partCnt);
+                    }
+                }
+            });
+        }
+
+        private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr)
+        {
+            SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
+
+                    _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId);
+                }
+            });
+        }
+
+        private void AffinityFunctionDestroy(void* target, long ptr)
+        {
+            SafeCall(() =>
+            {
+                _handleRegistry.Release(ptr);
+            });
+        }
+
+        #endregion
+
         #region HELPERS
 
         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]


Mime
View raw message