ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [34/50] [abbrv] ignite git commit: GG-11293: .NET: Backported affinity functions feature to 7.5.30.
Date Tue, 06 Sep 2016 14:39:56 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
new file mode 100644
index 0000000..6067af4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl;
+
+    /// <summary>
+    /// Affinity function context.
+    /// </summary>
+    public class AffinityFunctionContext
+    {
+        /** */
+        private readonly List<List<IClusterNode>> _previousAssignment;
+
+        /** */
+        private readonly int _backups;
+
+        /** */
+        private readonly ICollection<IClusterNode> _currentTopologySnapshot;
+
+        /** */
+        private readonly AffinityTopologyVersion _currentTopologyVersion;
+
+        /** */
+        private readonly DiscoveryEvent _discoveryEvent;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AffinityFunctionContext"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        internal AffinityFunctionContext(IBinaryRawReader reader)
+        {
+            Debug.Assert(reader != null);
+
+            _currentTopologySnapshot = IgniteUtils.ReadNodes(reader);
+            _backups = reader.ReadInt();
+            _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt());
+            _discoveryEvent = EventReader.Read<DiscoveryEvent>(reader);
+
+            // Prev assignment
+            var cnt = reader.ReadInt();
+
+            if (cnt > 0)
+            {
+                _previousAssignment = new List<List<IClusterNode>>(cnt);
+
+                for (var i = 0; i < cnt; i++)
+                    _previousAssignment.Add(IgniteUtils.ReadNodes(reader));
+            }
+        }
+
+        /// <summary>
+        /// Gets the affinity assignment for given partition on previous topology version.
+        /// First node in returned list is a primary node, other nodes are backups.
+        /// </summary>
+        /// <param name="partition">The partition to get previous assignment for.</param>
+        /// <returns>
+        /// List of nodes assigned to a given partition on previous topology version or <code>null</code>
+        /// if this information is not available.
+        /// </returns>
+        public ICollection<IClusterNode> GetPreviousAssignment(int partition)
+        {
+            return _previousAssignment == null ? null : _previousAssignment[partition];
+        }
+
+        /// <summary>
+        /// Gets number of backups for new assignment.
+        /// </summary>
+        public int Backups
+        {
+            get { return _backups; }
+        }
+
+        /// <summary>
+        /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular
+        /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order
+        /// on all nodes on which partition assignment is performed.
+        /// </summary>
+        public ICollection<IClusterNode> CurrentTopologySnapshot
+        {
+            get { return _currentTopologySnapshot; }
+        }
+
+        /// <summary>
+        /// Gets the current topology version.
+        /// </summary>
+        public AffinityTopologyVersion CurrentTopologyVersion
+        {
+            get { return _currentTopologyVersion; }
+        }
+
+        /// <summary>
+        /// Gets the discovery event that caused the topology change.
+        /// </summary>
+        public DiscoveryEvent DiscoveryEvent
+        {
+            get { return _discoveryEvent; }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
new file mode 100644
index 0000000..9bfdfb4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    using System;
+    using Apache.Ignite.Core.Cluster;
+
+    /// <summary>
+    /// Affinity topology version.
+    /// </summary>
+    public struct AffinityTopologyVersion : IEquatable<AffinityTopologyVersion>
+    {
+        /** */
+        private readonly long _version;
+
+        /** */
+        private readonly int _minorVersion;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AffinityTopologyVersion"/> struct.
+        /// </summary>
+        /// <param name="version">The version.</param>
+        /// <param name="minorVersion">The minor version.</param>
+        public AffinityTopologyVersion(long version, int minorVersion)
+        {
+            _version = version;
+            _minorVersion = minorVersion;
+        }
+
+        /// <summary>
+        /// Gets the major version, same as <see cref="ICluster.TopologyVersion"/>.
+        /// </summary>
+        public long Version
+        {
+            get { return _version; }
+        }
+
+        /// <summary>
+        /// Gets the minor version, which is increased when new caches start.
+        /// </summary>
+        public int MinorVersion
+        {
+            get { return _minorVersion; }
+        }
+
+        /// <summary>
+        /// Indicates whether the current object is equal to another object of the same type.
+        /// </summary>
+        /// <param name="other">An object to compare with this object.</param>
+        /// <returns>
+        /// true if the current object is equal to the <paramref name="other" /> parameter; otherwise, false.
+        /// </returns>
+        public bool Equals(AffinityTopologyVersion other)
+        {
+            return _version == other._version && _minorVersion == other._minorVersion;
+        }
+
+        /// <summary>
+        /// Determines whether the specified <see cref="System.Object" />, is equal to this instance.
+        /// </summary>
+        /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
+        /// <returns>
+        /// <c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, 
+        /// <c>false</c>.
+        /// </returns>
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj)) return false;
+            return obj is AffinityTopologyVersion && Equals((AffinityTopologyVersion) obj);
+        }
+
+        /// <summary>
+        /// Returns a hash code for this instance.
+        /// </summary>
+        /// <returns>
+        /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. 
+        /// </returns>
+        public override int GetHashCode()
+        {
+            unchecked
+            {
+                return (_version.GetHashCode()*397) ^ _minorVersion;
+            }
+        }
+
+        /// <summary>
+        /// Implements the operator ==.
+        /// </summary>
+        /// <param name="left">The left.</param>
+        /// <param name="right">The right.</param>
+        /// <returns>
+        /// The result of the operator.
+        /// </returns>
+        public static bool operator ==(AffinityTopologyVersion left, AffinityTopologyVersion right)
+        {
+            return left.Equals(right);
+        }
+
+        /// <summary>
+        /// Implements the operator !=.
+        /// </summary>
+        /// <param name="left">The left.</param>
+        /// <param name="right">The right.</param>
+        /// <returns>
+        /// The result of the operator.
+        /// </returns>
+        public static bool operator !=(AffinityTopologyVersion left, AffinityTopologyVersion right)
+        {
+            return !left.Equals(right);
+        }
+
+        /// <summary>
+        /// Returns a <see cref="string" /> that represents this instance.
+        /// </summary>
+        /// <returns>
+        /// A <see cref="string" /> that represents this instance.
+        /// </returns>
+        public override string ToString()
+        {
+            return string.Format("AffinityTopologyVersion [Version={0}, MinorVersion={1}]", _version, _minorVersion);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
new file mode 100644
index 0000000..4a3885f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity.Fair
+{
+    using System;
+
+    /// <summary>
+    /// Fair affinity function which tries to ensure that all nodes get equal number of partitions with 
+    /// minimum amount of reassignments between existing nodes.
+    /// </summary>
+    [Serializable]
+    public class FairAffinityFunction : AffinityFunctionBase
+    {
+        // No-op.
+        // Actual implementation is in Java, see AffinityFunctionSerializer.Write method.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
new file mode 100644
index 0000000..b6c190c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache.Affinity.Fair;
+    using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+    using Apache.Ignite.Core.Cluster;
+
+    /// <summary>
+    /// Represents a function that maps cache keys to cluster nodes.
+    /// <para />
+    /// Predefined implementations: 
+    /// <see cref="RendezvousAffinityFunction"/>, <see cref="FairAffinityFunction"/>.
+    /// </summary>
+    public interface IAffinityFunction
+    {
+        /// <summary>
+        /// Gets the total number of partitions.
+        /// <para />
+        /// All caches should always provide correct partition count which should be the same on all 
+        /// participating nodes. Note that partitions should always be numbered from 0 inclusively 
+        /// to N exclusively without any gaps.
+        /// </summary>
+        int Partitions { get; }
+
+        /// <summary>
+        /// Gets partition number for a given key starting from 0. Partitioned caches
+        /// should make sure that keys are about evenly distributed across all partitions
+        /// from 0 to <see cref="Partitions"/> for best performance.
+        /// <para />
+        /// Note that for fully replicated caches it is possible to segment key sets among different
+        /// grid node groups. In that case each node group should return a unique partition
+        /// number. However, unlike partitioned cache, mappings of keys to nodes in
+        /// replicated caches are constant and a node cannot migrate from one partition
+        /// to another.
+        /// </summary>
+        /// <param name="key">Key to get partition for.</param>
+        /// <returns>Partition number for a given key.</returns>
+        int GetPartition(object key);
+
+        /// <summary>
+        /// Removes node from affinity. This method is called when it is safe to remove 
+        /// disconnected node from affinity mapping.
+        /// </summary>
+        /// <param name="nodeId">The node identifier.</param>
+        void RemoveNode(Guid nodeId);
+
+        /// <summary>
+        /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+        /// nodes are updated in the same manner. In case of partitioned cache, the returned
+        /// list should contain only the primary and back up nodes with primary node being
+        /// always first.
+        /// <pare />
+        /// Note that partitioned affinity must obey the following contract: given that node
+        /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+        /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+        /// </summary>
+        /// <param name="context">The affinity function context.</param>
+        /// <returns>
+        /// A collection of partitions, where each partition is a collection of nodes,
+        /// where first node is a primary node, and other nodes are backup nodes.
+        /// </returns>
+        IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
new file mode 100644
index 0000000..98ec364
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous
+{
+    using System;
+
+    /// <summary>
+    /// Affinity function for partitioned cache based on Highest Random Weight algorithm.
+    /// </summary>
+    [Serializable]
+    public class RendezvousAffinityFunction : AffinityFunctionBase
+    {
+        // No-op.
+        // Actual implementation is in Java, see AffinityFunctionSerializer.Write method.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
index cb1c715..ee1c837 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
@@ -32,16 +32,14 @@ namespace Apache.Ignite.Core.Events
         /// <param name="reader">Reader.</param>
         /// <returns>Deserialized event.</returns>
         /// <exception cref="System.InvalidCastException">Incompatible event type.</exception>
-        public static T Read<T>(IBinaryReader reader) where T : IEvent
+        public static T Read<T>(IBinaryRawReader reader) where T : IEvent
         {
-            var r = reader.GetRawReader();
-
-            var clsId = r.ReadInt();
+            var clsId = reader.ReadInt();
 
             if (clsId == -1)
                 return default(T);
 
-            return (T) CreateInstance(clsId, r);
+            return (T) CreateInstance(clsId, reader);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index 7a3fafc..f6fab5d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -26,10 +26,12 @@ namespace Apache.Ignite.Core
     using System.Runtime;
     using System.Threading;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
+    using Apache.Ignite.Core.Impl.Cache.Affinity;
     using Apache.Ignite.Core.Impl.Common;
     using Apache.Ignite.Core.Impl.Handle;
     using Apache.Ignite.Core.Impl.Memory;
@@ -233,6 +235,8 @@ namespace Apache.Ignite.Core
                 PrepareConfiguration(reader);
 
                 PrepareLifecycleBeans(reader, outStream, handleRegistry);
+
+                PrepareAffinityFunctions(reader, outStream);
             }
             catch (Exception e)
             {
@@ -282,7 +286,7 @@ namespace Apache.Ignite.Core
             int cnt = reader.ReadInt();
 
             for (int i = 0; i < cnt; i++)
-                beans.Add(new LifecycleBeanHolder(CreateLifecycleBean(reader)));
+                beans.Add(new LifecycleBeanHolder(CreateObject<ILifecycleBean>(reader)));
 
             // 2. Append beans definied in local configuration.
             ICollection<ILifecycleBean> nativeBeans = _startup.Configuration.LifecycleBeans;
@@ -306,21 +310,33 @@ namespace Apache.Ignite.Core
         }
 
         /// <summary>
-        /// Create lifecycle bean.
+        /// Prepares the affinity functions.
         /// </summary>
-        /// <param name="reader">Reader.</param>
-        /// <returns>Lifecycle bean.</returns>
-        private static ILifecycleBean CreateLifecycleBean(BinaryReader reader)
+        private static void PrepareAffinityFunctions(BinaryReader reader, PlatformMemoryStream outStream)
         {
-            // 1. Instantiate.
-            var bean = IgniteUtils.CreateInstance<ILifecycleBean>(reader.ReadString());
+            var cnt = reader.ReadInt();
+
+            var writer = reader.Marshaller.StartMarshal(outStream);
 
-            // 2. Set properties.
-            var props = reader.ReadDictionaryAsGeneric<string, object>();
+            for (var i = 0; i < cnt; i++)
+            {
+                var objHolder = new ObjectInfoHolder(reader);
+                AffinityFunctionSerializer.Write(writer, objHolder.CreateInstance<IAffinityFunction>(), objHolder);
+            }
+        }
+
+        /// <summary>
+        /// Creates an object and sets the properties.
+        /// </summary>
+        /// <param name="reader">Reader.</param>
+        /// <returns>Resulting object.</returns>
+        private static T CreateObject<T>(IBinaryRawReader reader)
+        {
+            var res =  IgniteUtils.CreateInstance<T>(reader.ReadString());
 
-            IgniteUtils.SetProperties(bean, props);
+            IgniteUtils.SetProperties(res, reader.ReadDictionaryAsGeneric<string, object>());
 
-            return bean;
+            return res;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
index c3dcc3a..a5446ac 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Binary
 {
     using System.Collections.Generic;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Common;
 
     /// <summary>
     /// Reader extensions.
@@ -48,5 +49,18 @@ namespace Apache.Ignite.Core.Impl.Binary
         {
             return (Dictionary<TKey, TValue>) reader.ReadDictionary(size => new Dictionary<TKey, TValue>(size));
         }
+
+        /// <summary>
+        /// Reads the object either as a normal object or as a [typeName+props] wrapper.
+        /// </summary>
+        public static T ReadObjectEx<T>(this IBinaryRawReader reader)
+        {
+            var obj = reader.ReadObject<object>();
+
+            if (obj == null)
+                return default(T);
+
+            return obj is T ? (T)obj : ((ObjectInfoHolder)obj).CreateInstance<T>();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 81fc195..a63e8f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -506,12 +506,15 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <summary>
         /// Adds a predefined system type.
         /// </summary>
-        private void AddSystemType<T>(byte typeId, Func<BinaryReader, T> ctor) where T : IBinaryWriteAware
+        private void AddSystemType<T>(int typeId, Func<BinaryReader, T> ctor) where T : IBinaryWriteAware
         {
             var type = typeof(T);
 
             var serializer = new BinarySystemTypeSerializer<T>(ctor);
 
+            if (typeId == 0)
+                typeId = BinaryUtils.TypeId(type.Name, null, null);
+
             AddType(type, typeId, BinaryUtils.GetTypeName(type), false, false, null, null, serializer, null, false);
         }
 
@@ -536,6 +539,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             AddSystemType(BinaryUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w));
             AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
             AddSystemType(BinaryUtils.TypePlatformJavaObjectFactoryProxy, w => new PlatformJavaObjectFactoryProxy());
+            AddSystemType(0, w => new ObjectInfoHolder(w));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
new file mode 100644
index 0000000..888445a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using System.IO;
+    using System.Linq;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cache.Affinity.Fair;
+    using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Impl.Binary;
+    using Apache.Ignite.Core.Impl.Binary.IO;
+    using Apache.Ignite.Core.Impl.Memory;
+
+    /// <summary>
+    /// Affinity function read/write methods.
+    /// </summary>
+    internal static class AffinityFunctionSerializer
+    {
+        /** */
+        private const byte TypeCodeNull = 0;
+
+        /** */
+        private const byte TypeCodeFair = 1;
+
+        /** */
+        private const byte TypeCodeRendezvous = 2;
+
+        /** */
+        private const byte TypeCodeUser = 3;
+
+        /// <summary>
+        /// Writes the instance.
+        /// </summary>
+        internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun, object userFuncOverride = null)
+        {
+            Debug.Assert(writer != null);
+
+            if (fun == null)
+            {
+                writer.WriteByte(TypeCodeNull);
+                return;
+            }
+
+            // 1) Type code
+            // 2) Partitions
+            // 3) ExcludeNeighbors
+            // 4) Override flags
+            // 5) User object
+
+            var p = fun as AffinityFunctionBase;
+
+            if (p != null)
+            {
+                writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
+                writer.WriteInt(p.Partitions);
+                writer.WriteBoolean(p.ExcludeNeighbors);
+
+                var overrideFlags = GetOverrideFlags(p.GetType());
+                writer.WriteByte((byte) overrideFlags);
+
+                // Do not write user func if there is nothing overridden
+                WriteUserFunc(writer, overrideFlags != UserOverrides.None ? fun : null, userFuncOverride);
+            }
+            else
+            {
+                writer.WriteByte(TypeCodeUser);
+                writer.WriteInt(fun.Partitions);
+                writer.WriteBoolean(false); // Exclude neighbors
+                writer.WriteByte((byte) UserOverrides.All);
+                WriteUserFunc(writer, fun, userFuncOverride);
+            }
+        }
+
+        /// <summary>
+        /// Reads the instance.
+        /// </summary>
+        internal static IAffinityFunction Read(IBinaryRawReader reader)
+        {
+            Debug.Assert(reader != null);
+
+            var typeCode = reader.ReadByte();
+
+            if (typeCode == TypeCodeNull)
+                return null;
+
+            var partitions = reader.ReadInt();
+            var exclNeighbors = reader.ReadBoolean();
+            var overrideFlags = (UserOverrides)reader.ReadByte();
+            var userFunc = reader.ReadObjectEx<IAffinityFunction>();
+
+            if (userFunc != null)
+            {
+                Debug.Assert(overrideFlags != UserOverrides.None);
+
+                var fair = userFunc as FairAffinityFunction;
+                if (fair != null)
+                {
+                    fair.Partitions = partitions;
+                    fair.ExcludeNeighbors = exclNeighbors;
+                }
+
+                var rendezvous = userFunc as RendezvousAffinityFunction;
+                if (rendezvous != null)
+                {
+                    rendezvous.Partitions = partitions;
+                    rendezvous.ExcludeNeighbors = exclNeighbors;
+                }
+
+                return userFunc;
+            }
+
+            Debug.Assert(overrideFlags == UserOverrides.None);
+            AffinityFunctionBase fun;
+
+            switch (typeCode)
+            {
+                case TypeCodeFair:
+                    fun = new FairAffinityFunction();
+                    break;
+                case TypeCodeRendezvous:
+                    fun = new RendezvousAffinityFunction();
+                    break;
+                default:
+                    throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode);
+            }
+
+            fun.Partitions = partitions;
+            fun.ExcludeNeighbors = exclNeighbors;
+
+            return fun;
+        }
+
+
+        /// <summary>
+        /// Writes the partitions assignment to a stream.
+        /// </summary>
+        /// <param name="parts">The parts.</param>
+        /// <param name="stream">The stream.</param>
+        /// <param name="marsh">The marshaller.</param>
+        internal static void WritePartitions(IEnumerable<IEnumerable<IClusterNode>> parts,
+            PlatformMemoryStream stream, Marshaller marsh)
+        {
+            Debug.Assert(parts != null);
+            Debug.Assert(stream != null);
+            Debug.Assert(marsh != null);
+
+            IBinaryRawWriter writer = marsh.StartMarshal(stream);
+
+            var partCnt = 0;
+            writer.WriteInt(partCnt); // reserve size
+
+            foreach (var part in parts)
+            {
+                if (part == null)
+                    throw new IgniteException("IAffinityFunction.AssignPartitions() returned invalid partition: null");
+
+                partCnt++;
+
+                var nodeCnt = 0;
+                var cntPos = stream.Position;
+                writer.WriteInt(nodeCnt); // reserve size
+
+                foreach (var node in part)
+                {
+                    nodeCnt++;
+                    writer.WriteGuid(node.Id);
+                }
+
+                var endPos = stream.Position;
+                stream.Seek(cntPos, SeekOrigin.Begin);
+                stream.WriteInt(nodeCnt);
+                stream.Seek(endPos, SeekOrigin.Begin);
+            }
+
+            stream.SynchronizeOutput();
+            stream.Seek(0, SeekOrigin.Begin);
+            writer.WriteInt(partCnt);
+        }
+
+        /// <summary>
+        /// Reads the partitions assignment from a stream.
+        /// </summary>
+        /// <param name="stream">The stream.</param>
+        /// <param name="marsh">The marshaller.</param>
+        /// <returns>Partitions assignment.</returns>
+        internal static IEnumerable<IEnumerable<IClusterNode>> ReadPartitions(IBinaryStream stream, Marshaller marsh)
+        {
+            Debug.Assert(stream != null);
+            Debug.Assert(marsh != null);
+
+            IBinaryRawReader reader = marsh.StartUnmarshal(stream);
+
+            var partCnt = reader.ReadInt();
+
+            var res = new List<IEnumerable<IClusterNode>>(partCnt);
+
+            for (var i = 0; i < partCnt; i++)
+                res.Add(IgniteUtils.ReadNodes(reader));
+
+            return res;
+        }
+
+        /// <summary>
+        /// Gets the override flags.
+        /// </summary>
+        private static UserOverrides GetOverrideFlags(Type funcType)
+        {
+            var res = UserOverrides.None;
+
+            var methods = new[] {UserOverrides.GetPartition, UserOverrides.AssignPartitions, UserOverrides.RemoveNode};
+
+            var map = funcType.GetInterfaceMap(typeof(IAffinityFunction));
+
+            foreach (var method in methods)
+            {
+                // Find whether user type overrides IAffinityFunction method from AffinityFunctionBase.
+                var methodName = method.ToString();
+
+                if (map.TargetMethods.Single(x => x.Name == methodName).DeclaringType != typeof(AffinityFunctionBase))
+                    res |= method;
+            }
+
+            return res;
+        }
+
+        /// <summary>
+        /// Writes the user function.
+        /// </summary>
+        private static void WriteUserFunc(IBinaryRawWriter writer, IAffinityFunction func, object funcOverride)
+        {
+            if (funcOverride != null)
+            {
+                writer.WriteObject(funcOverride);
+                return;
+            }
+
+            if (func != null && !func.GetType().IsSerializable)
+                throw new IgniteException("AffinityFunction should be serializable.");
+
+            writer.WriteObject(func);
+        }
+
+        /// <summary>
+        /// Overridden function flags.
+        /// </summary>
+        [Flags]
+        private enum UserOverrides : byte
+        {
+            None = 0,
+            GetPartition = 1,
+            RemoveNode = 1 << 1,
+            AssignPartitions = 1 << 2,
+            All = GetPartition | RemoveNode | AssignPartitions
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
new file mode 100644
index 0000000..d335804
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Impl.Binary;
+    using Apache.Ignite.Core.Impl.Unmanaged;
+
+    /// <summary>
+    /// Affinity function that delegates to Java.
+    /// </summary>
+    internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction
+    {
+        /** Opcodes. */
+        private enum  Op
+        {
+            Partition = 1,
+            RemoveNode = 2,
+            AssignPartitions = 3
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class.
+        /// </summary>
+        /// <param name="target">Target.</param>
+        /// <param name="marsh">Marshaller.</param>
+        public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+        {
+            // No-op.
+        }
+
+        /** <inheritdoc /> */
+        public int Partitions
+        {
+            get { throw new NotSupportedException("PlatformAffinityFunction.Partitions is not supported."); }
+        }
+
+        /** <inheritdoc /> */
+        public int GetPartition(object key)
+        {
+            return (int) DoOutOp((int) Op.Partition, w => w.WriteObject(key));
+        }
+
+        /** <inheritdoc /> */
+        public void RemoveNode(Guid nodeId)
+        {
+            DoOutOp((int) Op.RemoveNode, w => w.WriteGuid(nodeId));
+        }
+
+        /** <inheritdoc /> */
+        public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+        {
+            return DoInOp((int) Op.AssignPartitions, s => AffinityFunctionSerializer.ReadPartitions(s, Marshaller));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
new file mode 100644
index 0000000..407fe0c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+    using System.Collections.Generic;
+    using System.Diagnostics;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Binary;
+
+    /// <summary>
+    /// Holds the information to instantiate an object and set its properties.
+    /// Typically used for .NET objects defined in Spring XML.
+    /// </summary>
+    internal class ObjectInfoHolder : IBinaryWriteAware
+    {
+        /** Type name. */
+        private readonly string _typeName;
+
+        /** Properties. */
+        private readonly Dictionary<string, object> _properties;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ObjectInfoHolder"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        public ObjectInfoHolder(IBinaryRawReader reader)
+        {
+            Debug.Assert(reader != null);
+
+            _typeName = reader.ReadString();
+            _properties = reader.ReadDictionaryAsGeneric<string, object>();
+
+            Debug.Assert(!string.IsNullOrEmpty(_typeName));
+        }
+
+        /// <summary>
+        /// Gets the name of the type.
+        /// </summary>
+        public string TypeName
+        {
+            get { return _typeName; }
+        }
+
+        /// <summary>
+        /// Gets the properties.
+        /// </summary>
+        public Dictionary<string, object> Properties
+        {
+            get { return _properties; }
+        }
+
+        /// <summary>
+        /// Creates an instance according to type name and properties.
+        /// </summary>
+        public T CreateInstance<T>()
+        {
+            return IgniteUtils.CreateInstance<T>(TypeName, Properties);
+        }
+
+        /** <inheritdoc /> */
+        public void WriteBinary(IBinaryWriter writer)
+        {
+            Debug.Assert(writer != null);
+
+            var w = writer.GetRawWriter();
+
+            w.WriteString(_typeName);
+            w.WriteDictionary(_properties);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index 7929a5d..5d0d989 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -128,8 +128,9 @@ namespace Apache.Ignite.Core.Impl
         /// Create new instance of specified class.
         /// </summary>
         /// <param name="typeName">Class name</param>
+        /// <param name="props">Properties to set.</param>
         /// <returns>New Instance.</returns>
-        public static T CreateInstance<T>(string typeName)
+        public static T CreateInstance<T>(string typeName, IEnumerable<KeyValuePair<string, object>> props = null)
         {
             IgniteArgumentCheck.NotNullOrEmpty(typeName, "typeName");
 
@@ -138,7 +139,12 @@ namespace Apache.Ignite.Core.Impl
             if (type == null)
                 throw new IgniteException("Failed to create class instance [className=" + typeName + ']');
 
-            return (T) Activator.CreateInstance(type);
+            var res = (T)Activator.CreateInstance(type);
+
+            if (props != null)
+                SetProperties(res, props);
+
+            return res;
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
index 8147e9d..dd16d03 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
@@ -95,5 +95,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
 
         internal void* extensionCbInLongOutLong;
         internal void* extensionCbInLongLongOutLong;
+
+        internal void* affinityFunctionInit;
+        internal void* affinityFunctionPartition;
+        internal void* affinityFunctionAssignPartitions;
+        internal void* affinityFunctionRemoveNode;
+        internal void* affinityFunctionDestroy;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index e554cfc..f4b3db9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -21,14 +21,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
     using System.Collections.Generic;
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
+    using System.IO;
     using System.Runtime.InteropServices;
     using System.Threading;
-
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Binary.IO;
     using Apache.Ignite.Core.Impl.Cache;
+    using Apache.Ignite.Core.Impl.Cache.Affinity;
     using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
     using Apache.Ignite.Core.Impl.Cache.Store;
     using Apache.Ignite.Core.Impl.Common;
@@ -161,6 +163,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1);
         private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2);
 
+        private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc);
+        private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr);
+        private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr);
+        private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr);
+        private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr);
+
         /// <summary>
         /// constructor.
         /// </summary>
@@ -240,7 +248,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
                 error = CreateFunctionPointer((ErrorCallbackDelegate)Error),
                 
                 extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong),
-                extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong)
+                extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong),
+                
+                affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit),
+                affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition),
+                affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions),
+                affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode),
+                affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy)
             };
 
             _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
@@ -1057,7 +1071,120 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         }
 
         #endregion
-        
+
+        #region AffinityFunction
+
+        private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc)
+        {
+            return SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+                    var func = reader.ReadObjectEx<IAffinityFunction>();
+
+                    ResourceProcessor.Inject(func, _ignite);
+
+                    var affBase = func as AffinityFunctionBase;
+
+                    if (affBase != null)
+                        affBase.SetBaseFunction(new PlatformAffinityFunction(
+                            _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller));
+
+                    return _handleRegistry.Allocate(func);
+                }
+            });
+        }
+
+        private int AffinityFunctionPartition(void* target, long ptr, long memPtr)
+        {
+            return SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var key = _ignite.Marshaller.Unmarshal<object>(stream);
+
+                    return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key);
+                }
+            });
+        }
+
+        private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr)
+        {
+            SafeCall(() =>
+            {
+                using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
+                {
+                    var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream));
+                    var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
+                    var parts = func.AssignPartitions(ctx);
+
+                    if (parts == null)
+                        throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null");
+
+                    using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
+                    {
+                        var writer = _ignite.Marshaller.StartMarshal(outStream);
+
+                        var partCnt = 0;
+                        writer.WriteInt(partCnt);  // reserve size
+
+                        foreach (var part in parts)
+                        {
+                            if (part == null)
+                                throw new IgniteException(func.GetType() +
+                                                          ".AssignPartitions() returned invalid partition: null");
+
+                            partCnt++;
+
+                            var nodeCnt = 0;
+                            var cntPos = outStream.Position;
+                            writer.WriteInt(nodeCnt);  // reserve size
+
+                            foreach (var node in part)
+                            {
+                                nodeCnt++;
+                                writer.WriteGuid(node.Id);
+                            }
+
+                            var endPos = outStream.Position;
+                            outStream.Seek(cntPos, SeekOrigin.Begin);
+                            outStream.WriteInt(nodeCnt);
+                            outStream.Seek(endPos, SeekOrigin.Begin);
+                        }
+
+                        outStream.SynchronizeOutput();
+                        outStream.Seek(0, SeekOrigin.Begin);
+                        writer.WriteInt(partCnt);
+                    }
+                }
+            });
+        }
+
+        private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr)
+        {
+            SafeCall(() =>
+            {
+                using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+                {
+                    var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
+
+                    _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId);
+                }
+            });
+        }
+
+        private void AffinityFunctionDestroy(void* target, long ptr)
+        {
+            SafeCall(() =>
+            {
+                _handleRegistry.Release(ptr);
+            });
+        }
+
+        #endregion
+
         #region HELPERS
 
         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]


Mime
View raw message