Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72E20200B6F for ; Wed, 24 Aug 2016 16:04:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 716D0160AB1; Wed, 24 Aug 2016 14:04:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EEAE0160AD1 for ; Wed, 24 Aug 2016 16:04:25 +0200 (CEST) Received: (qmail 95215 invoked by uid 500); 24 Aug 2016 14:04:25 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 95043 invoked by uid 99); 24 Aug 2016 14:04:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2016 14:04:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE7DFE5718; Wed, 24 Aug 2016 14:04:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 24 Aug 2016 14:04:40 -0000 Message-Id: <1351c96fb2444ed5965163a327477df6@git.apache.org> In-Reply-To: <6836123d3ae54a6380a447ec97c23218@git.apache.org> References: <6836123d3ae54a6380a447ec97c23218@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/50] [abbrv] ignite git commit: GG-11293: .NET: Backported affinity functions feature to 7.5.30. archived-at: Wed, 24 Aug 2016 14:04:28 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs new file mode 100644 index 0000000..6067af4 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl; + + /// + /// Affinity function context. + /// + public class AffinityFunctionContext + { + /** */ + private readonly List> _previousAssignment; + + /** */ + private readonly int _backups; + + /** */ + private readonly ICollection _currentTopologySnapshot; + + /** */ + private readonly AffinityTopologyVersion _currentTopologyVersion; + + /** */ + private readonly DiscoveryEvent _discoveryEvent; + + /// + /// Initializes a new instance of the class. + /// + /// The reader. + internal AffinityFunctionContext(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + _currentTopologySnapshot = IgniteUtils.ReadNodes(reader); + _backups = reader.ReadInt(); + _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt()); + _discoveryEvent = EventReader.Read(reader); + + // Prev assignment + var cnt = reader.ReadInt(); + + if (cnt > 0) + { + _previousAssignment = new List>(cnt); + + for (var i = 0; i < cnt; i++) + _previousAssignment.Add(IgniteUtils.ReadNodes(reader)); + } + } + + /// + /// Gets the affinity assignment for given partition on previous topology version. + /// First node in returned list is a primary node, other nodes are backups. + /// + /// The partition to get previous assignment for. + /// + /// List of nodes assigned to a given partition on previous topology version or null + /// if this information is not available. + /// + public ICollection GetPreviousAssignment(int partition) + { + return _previousAssignment == null ? null : _previousAssignment[partition]; + } + + /// + /// Gets number of backups for new assignment. + /// + public int Backups + { + get { return _backups; } + } + + /// + /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular + /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order + /// on all nodes on which partition assignment is performed. + /// + public ICollection CurrentTopologySnapshot + { + get { return _currentTopologySnapshot; } + } + + /// + /// Gets the current topology version. + /// + public AffinityTopologyVersion CurrentTopologyVersion + { + get { return _currentTopologyVersion; } + } + + /// + /// Gets the discovery event that caused the topology change. + /// + public DiscoveryEvent DiscoveryEvent + { + get { return _discoveryEvent; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs new file mode 100644 index 0000000..9bfdfb4 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System; + using Apache.Ignite.Core.Cluster; + + /// + /// Affinity topology version. + /// + public struct AffinityTopologyVersion : IEquatable + { + /** */ + private readonly long _version; + + /** */ + private readonly int _minorVersion; + + /// + /// Initializes a new instance of the struct. + /// + /// The version. + /// The minor version. + public AffinityTopologyVersion(long version, int minorVersion) + { + _version = version; + _minorVersion = minorVersion; + } + + /// + /// Gets the major version, same as . + /// + public long Version + { + get { return _version; } + } + + /// + /// Gets the minor version, which is increased when new caches start. + /// + public int MinorVersion + { + get { return _minorVersion; } + } + + /// + /// Indicates whether the current object is equal to another object of the same type. + /// + /// An object to compare with this object. + /// + /// true if the current object is equal to the parameter; otherwise, false. + /// + public bool Equals(AffinityTopologyVersion other) + { + return _version == other._version && _minorVersion == other._minorVersion; + } + + /// + /// Determines whether the specified , is equal to this instance. + /// + /// The to compare with this instance. + /// + /// true if the specified is equal to this instance; otherwise, + /// false. + /// + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + return obj is AffinityTopologyVersion && Equals((AffinityTopologyVersion) obj); + } + + /// + /// Returns a hash code for this instance. + /// + /// + /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. + /// + public override int GetHashCode() + { + unchecked + { + return (_version.GetHashCode()*397) ^ _minorVersion; + } + } + + /// + /// Implements the operator ==. + /// + /// The left. + /// The right. + /// + /// The result of the operator. + /// + public static bool operator ==(AffinityTopologyVersion left, AffinityTopologyVersion right) + { + return left.Equals(right); + } + + /// + /// Implements the operator !=. + /// + /// The left. + /// The right. + /// + /// The result of the operator. + /// + public static bool operator !=(AffinityTopologyVersion left, AffinityTopologyVersion right) + { + return !left.Equals(right); + } + + /// + /// Returns a that represents this instance. + /// + /// + /// A that represents this instance. + /// + public override string ToString() + { + return string.Format("AffinityTopologyVersion [Version={0}, MinorVersion={1}]", _version, _minorVersion); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs new file mode 100644 index 0000000..4a3885f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity.Fair +{ + using System; + + /// + /// Fair affinity function which tries to ensure that all nodes get equal number of partitions with + /// minimum amount of reassignments between existing nodes. + /// + [Serializable] + public class FairAffinityFunction : AffinityFunctionBase + { + // No-op. + // Actual implementation is in Java, see AffinityFunctionSerializer.Write method. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs new file mode 100644 index 0000000..b6c190c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cache.Affinity.Rendezvous; + using Apache.Ignite.Core.Cluster; + + /// + /// Represents a function that maps cache keys to cluster nodes. + /// + /// Predefined implementations: + /// , . + /// + public interface IAffinityFunction + { + /// + /// Gets the total number of partitions. + /// + /// All caches should always provide correct partition count which should be the same on all + /// participating nodes. Note that partitions should always be numbered from 0 inclusively + /// to N exclusively without any gaps. + /// + int Partitions { get; } + + /// + /// Gets partition number for a given key starting from 0. Partitioned caches + /// should make sure that keys are about evenly distributed across all partitions + /// from 0 to for best performance. + /// + /// Note that for fully replicated caches it is possible to segment key sets among different + /// grid node groups. In that case each node group should return a unique partition + /// number. However, unlike partitioned cache, mappings of keys to nodes in + /// replicated caches are constant and a node cannot migrate from one partition + /// to another. + /// + /// Key to get partition for. + /// Partition number for a given key. + int GetPartition(object key); + + /// + /// Removes node from affinity. This method is called when it is safe to remove + /// disconnected node from affinity mapping. + /// + /// The node identifier. + void RemoveNode(Guid nodeId); + + /// + /// Gets affinity nodes for a partition. In case of replicated cache, all returned + /// nodes are updated in the same manner. In case of partitioned cache, the returned + /// list should contain only the primary and back up nodes with primary node being + /// always first. + /// + /// Note that partitioned affinity must obey the following contract: given that node + /// N is primary for some key K, if any other node(s) leave + /// grid and no node joins grid, node N will remain primary for key K. + /// + /// The affinity function context. + /// + /// A collection of partitions, where each partition is a collection of nodes, + /// where first node is a primary node, and other nodes are backup nodes. + /// + IEnumerable> AssignPartitions(AffinityFunctionContext context); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs new file mode 100644 index 0000000..98ec364 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous +{ + using System; + + /// + /// Affinity function for partitioned cache based on Highest Random Weight algorithm. + /// + [Serializable] + public class RendezvousAffinityFunction : AffinityFunctionBase + { + // No-op. + // Actual implementation is in Java, see AffinityFunctionSerializer.Write method. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs index cb1c715..ee1c837 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs @@ -32,16 +32,14 @@ namespace Apache.Ignite.Core.Events /// Reader. /// Deserialized event. /// Incompatible event type. - public static T Read(IBinaryReader reader) where T : IEvent + public static T Read(IBinaryRawReader reader) where T : IEvent { - var r = reader.GetRawReader(); - - var clsId = r.ReadInt(); + var clsId = reader.ReadInt(); if (clsId == -1) return default(T); - return (T) CreateInstance(clsId, r); + return (T) CreateInstance(clsId, reader); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs index 7a3fafc..f6fab5d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs @@ -26,10 +26,12 @@ namespace Apache.Ignite.Core using System.Runtime; using System.Threading; using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache.Affinity; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Cache.Affinity; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Handle; using Apache.Ignite.Core.Impl.Memory; @@ -233,6 +235,8 @@ namespace Apache.Ignite.Core PrepareConfiguration(reader); PrepareLifecycleBeans(reader, outStream, handleRegistry); + + PrepareAffinityFunctions(reader, outStream); } catch (Exception e) { @@ -282,7 +286,7 @@ namespace Apache.Ignite.Core int cnt = reader.ReadInt(); for (int i = 0; i < cnt; i++) - beans.Add(new LifecycleBeanHolder(CreateLifecycleBean(reader))); + beans.Add(new LifecycleBeanHolder(CreateObject(reader))); // 2. Append beans definied in local configuration. ICollection nativeBeans = _startup.Configuration.LifecycleBeans; @@ -306,21 +310,33 @@ namespace Apache.Ignite.Core } /// - /// Create lifecycle bean. + /// Prepares the affinity functions. /// - /// Reader. - /// Lifecycle bean. - private static ILifecycleBean CreateLifecycleBean(BinaryReader reader) + private static void PrepareAffinityFunctions(BinaryReader reader, PlatformMemoryStream outStream) { - // 1. Instantiate. - var bean = IgniteUtils.CreateInstance(reader.ReadString()); + var cnt = reader.ReadInt(); + + var writer = reader.Marshaller.StartMarshal(outStream); - // 2. Set properties. - var props = reader.ReadDictionaryAsGeneric(); + for (var i = 0; i < cnt; i++) + { + var objHolder = new ObjectInfoHolder(reader); + AffinityFunctionSerializer.Write(writer, objHolder.CreateInstance(), objHolder); + } + } + + /// + /// Creates an object and sets the properties. + /// + /// Reader. + /// Resulting object. + private static T CreateObject(IBinaryRawReader reader) + { + var res = IgniteUtils.CreateInstance(reader.ReadString()); - IgniteUtils.SetProperties(bean, props); + IgniteUtils.SetProperties(res, reader.ReadDictionaryAsGeneric()); - return bean; + return res; } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs index c3dcc3a..a5446ac 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Binary { using System.Collections.Generic; using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Common; /// /// Reader extensions. @@ -48,5 +49,18 @@ namespace Apache.Ignite.Core.Impl.Binary { return (Dictionary) reader.ReadDictionary(size => new Dictionary(size)); } + + /// + /// Reads the object either as a normal object or as a [typeName+props] wrapper. + /// + public static T ReadObjectEx(this IBinaryRawReader reader) + { + var obj = reader.ReadObject(); + + if (obj == null) + return default(T); + + return obj is T ? (T)obj : ((ObjectInfoHolder)obj).CreateInstance(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs index 81fc195..a63e8f4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs @@ -506,12 +506,15 @@ namespace Apache.Ignite.Core.Impl.Binary /// /// Adds a predefined system type. /// - private void AddSystemType(byte typeId, Func ctor) where T : IBinaryWriteAware + private void AddSystemType(int typeId, Func ctor) where T : IBinaryWriteAware { var type = typeof(T); var serializer = new BinarySystemTypeSerializer(ctor); + if (typeId == 0) + typeId = BinaryUtils.TypeId(type.Name, null, null); + AddType(type, typeId, BinaryUtils.GetTypeName(type), false, false, null, null, serializer, null, false); } @@ -536,6 +539,7 @@ namespace Apache.Ignite.Core.Impl.Binary AddSystemType(BinaryUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w)); AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w)); AddSystemType(BinaryUtils.TypePlatformJavaObjectFactoryProxy, w => new PlatformJavaObjectFactoryProxy()); + AddSystemType(0, w => new ObjectInfoHolder(w)); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs new file mode 100644 index 0000000..888445a --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cache.Affinity.Rendezvous; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Memory; + + /// + /// Affinity function read/write methods. + /// + internal static class AffinityFunctionSerializer + { + /** */ + private const byte TypeCodeNull = 0; + + /** */ + private const byte TypeCodeFair = 1; + + /** */ + private const byte TypeCodeRendezvous = 2; + + /** */ + private const byte TypeCodeUser = 3; + + /// + /// Writes the instance. + /// + internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun, object userFuncOverride = null) + { + Debug.Assert(writer != null); + + if (fun == null) + { + writer.WriteByte(TypeCodeNull); + return; + } + + // 1) Type code + // 2) Partitions + // 3) ExcludeNeighbors + // 4) Override flags + // 5) User object + + var p = fun as AffinityFunctionBase; + + if (p != null) + { + writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous); + writer.WriteInt(p.Partitions); + writer.WriteBoolean(p.ExcludeNeighbors); + + var overrideFlags = GetOverrideFlags(p.GetType()); + writer.WriteByte((byte) overrideFlags); + + // Do not write user func if there is nothing overridden + WriteUserFunc(writer, overrideFlags != UserOverrides.None ? fun : null, userFuncOverride); + } + else + { + writer.WriteByte(TypeCodeUser); + writer.WriteInt(fun.Partitions); + writer.WriteBoolean(false); // Exclude neighbors + writer.WriteByte((byte) UserOverrides.All); + WriteUserFunc(writer, fun, userFuncOverride); + } + } + + /// + /// Reads the instance. + /// + internal static IAffinityFunction Read(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + var typeCode = reader.ReadByte(); + + if (typeCode == TypeCodeNull) + return null; + + var partitions = reader.ReadInt(); + var exclNeighbors = reader.ReadBoolean(); + var overrideFlags = (UserOverrides)reader.ReadByte(); + var userFunc = reader.ReadObjectEx(); + + if (userFunc != null) + { + Debug.Assert(overrideFlags != UserOverrides.None); + + var fair = userFunc as FairAffinityFunction; + if (fair != null) + { + fair.Partitions = partitions; + fair.ExcludeNeighbors = exclNeighbors; + } + + var rendezvous = userFunc as RendezvousAffinityFunction; + if (rendezvous != null) + { + rendezvous.Partitions = partitions; + rendezvous.ExcludeNeighbors = exclNeighbors; + } + + return userFunc; + } + + Debug.Assert(overrideFlags == UserOverrides.None); + AffinityFunctionBase fun; + + switch (typeCode) + { + case TypeCodeFair: + fun = new FairAffinityFunction(); + break; + case TypeCodeRendezvous: + fun = new RendezvousAffinityFunction(); + break; + default: + throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode); + } + + fun.Partitions = partitions; + fun.ExcludeNeighbors = exclNeighbors; + + return fun; + } + + + /// + /// Writes the partitions assignment to a stream. + /// + /// The parts. + /// The stream. + /// The marshaller. + internal static void WritePartitions(IEnumerable> parts, + PlatformMemoryStream stream, Marshaller marsh) + { + Debug.Assert(parts != null); + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + IBinaryRawWriter writer = marsh.StartMarshal(stream); + + var partCnt = 0; + writer.WriteInt(partCnt); // reserve size + + foreach (var part in parts) + { + if (part == null) + throw new IgniteException("IAffinityFunction.AssignPartitions() returned invalid partition: null"); + + partCnt++; + + var nodeCnt = 0; + var cntPos = stream.Position; + writer.WriteInt(nodeCnt); // reserve size + + foreach (var node in part) + { + nodeCnt++; + writer.WriteGuid(node.Id); + } + + var endPos = stream.Position; + stream.Seek(cntPos, SeekOrigin.Begin); + stream.WriteInt(nodeCnt); + stream.Seek(endPos, SeekOrigin.Begin); + } + + stream.SynchronizeOutput(); + stream.Seek(0, SeekOrigin.Begin); + writer.WriteInt(partCnt); + } + + /// + /// Reads the partitions assignment from a stream. + /// + /// The stream. + /// The marshaller. + /// Partitions assignment. + internal static IEnumerable> ReadPartitions(IBinaryStream stream, Marshaller marsh) + { + Debug.Assert(stream != null); + Debug.Assert(marsh != null); + + IBinaryRawReader reader = marsh.StartUnmarshal(stream); + + var partCnt = reader.ReadInt(); + + var res = new List>(partCnt); + + for (var i = 0; i < partCnt; i++) + res.Add(IgniteUtils.ReadNodes(reader)); + + return res; + } + + /// + /// Gets the override flags. + /// + private static UserOverrides GetOverrideFlags(Type funcType) + { + var res = UserOverrides.None; + + var methods = new[] {UserOverrides.GetPartition, UserOverrides.AssignPartitions, UserOverrides.RemoveNode}; + + var map = funcType.GetInterfaceMap(typeof(IAffinityFunction)); + + foreach (var method in methods) + { + // Find whether user type overrides IAffinityFunction method from AffinityFunctionBase. + var methodName = method.ToString(); + + if (map.TargetMethods.Single(x => x.Name == methodName).DeclaringType != typeof(AffinityFunctionBase)) + res |= method; + } + + return res; + } + + /// + /// Writes the user function. + /// + private static void WriteUserFunc(IBinaryRawWriter writer, IAffinityFunction func, object funcOverride) + { + if (funcOverride != null) + { + writer.WriteObject(funcOverride); + return; + } + + if (func != null && !func.GetType().IsSerializable) + throw new IgniteException("AffinityFunction should be serializable."); + + writer.WriteObject(func); + } + + /// + /// Overridden function flags. + /// + [Flags] + private enum UserOverrides : byte + { + None = 0, + GetPartition = 1, + RemoveNode = 1 << 1, + AssignPartitions = 1 << 2, + All = GetPartition | RemoveNode | AssignPartitions + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs new file mode 100644 index 0000000..d335804 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// + /// Affinity function that delegates to Java. + /// + internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction + { + /** Opcodes. */ + private enum Op + { + Partition = 1, + RemoveNode = 2, + AssignPartitions = 3 + } + + /// + /// Initializes a new instance of the class. + /// + /// Target. + /// Marshaller. + public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + { + // No-op. + } + + /** */ + public int Partitions + { + get { throw new NotSupportedException("PlatformAffinityFunction.Partitions is not supported."); } + } + + /** */ + public int GetPartition(object key) + { + return (int) DoOutOp((int) Op.Partition, w => w.WriteObject(key)); + } + + /** */ + public void RemoveNode(Guid nodeId) + { + DoOutOp((int) Op.RemoveNode, w => w.WriteGuid(nodeId)); + } + + /** */ + public IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + return DoInOp((int) Op.AssignPartitions, s => AffinityFunctionSerializer.ReadPartitions(s, Marshaller)); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs new file mode 100644 index 0000000..407fe0c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary; + + /// + /// Holds the information to instantiate an object and set its properties. + /// Typically used for .NET objects defined in Spring XML. + /// + internal class ObjectInfoHolder : IBinaryWriteAware + { + /** Type name. */ + private readonly string _typeName; + + /** Properties. */ + private readonly Dictionary _properties; + + /// + /// Initializes a new instance of the class. + /// + /// The reader. + public ObjectInfoHolder(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + _typeName = reader.ReadString(); + _properties = reader.ReadDictionaryAsGeneric(); + + Debug.Assert(!string.IsNullOrEmpty(_typeName)); + } + + /// + /// Gets the name of the type. + /// + public string TypeName + { + get { return _typeName; } + } + + /// + /// Gets the properties. + /// + public Dictionary Properties + { + get { return _properties; } + } + + /// + /// Creates an instance according to type name and properties. + /// + public T CreateInstance() + { + return IgniteUtils.CreateInstance(TypeName, Properties); + } + + /** */ + public void WriteBinary(IBinaryWriter writer) + { + Debug.Assert(writer != null); + + var w = writer.GetRawWriter(); + + w.WriteString(_typeName); + w.WriteDictionary(_properties); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs index 7929a5d..5d0d989 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs @@ -128,8 +128,9 @@ namespace Apache.Ignite.Core.Impl /// Create new instance of specified class. /// /// Class name + /// Properties to set. /// New Instance. - public static T CreateInstance(string typeName) + public static T CreateInstance(string typeName, IEnumerable> props = null) { IgniteArgumentCheck.NotNullOrEmpty(typeName, "typeName"); @@ -138,7 +139,12 @@ namespace Apache.Ignite.Core.Impl if (type == null) throw new IgniteException("Failed to create class instance [className=" + typeName + ']'); - return (T) Activator.CreateInstance(type); + var res = (T)Activator.CreateInstance(type); + + if (props != null) + SetProperties(res, props); + + return res; } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs index 8147e9d..dd16d03 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs @@ -95,5 +95,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged internal void* extensionCbInLongOutLong; internal void* extensionCbInLongLongOutLong; + + internal void* affinityFunctionInit; + internal void* affinityFunctionPartition; + internal void* affinityFunctionAssignPartitions; + internal void* affinityFunctionRemoveNode; + internal void* affinityFunctionDestroy; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index e554cfc..f4b3db9 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -21,14 +21,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; + using System.IO; using System.Runtime.InteropServices; using System.Threading; - + using Apache.Ignite.Core.Cache.Affinity; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Cache.Affinity; using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Cache.Store; using Apache.Ignite.Core.Impl.Common; @@ -161,6 +163,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1); private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2); + private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc); + private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr); + private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr); + private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr); + private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr); + /// /// constructor. /// @@ -240,7 +248,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged error = CreateFunctionPointer((ErrorCallbackDelegate)Error), extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong), - extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong) + extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong), + + affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit), + affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition), + affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions), + affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode), + affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy) }; _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize()); @@ -1057,7 +1071,120 @@ namespace Apache.Ignite.Core.Impl.Unmanaged } #endregion - + + #region AffinityFunction + + private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc) + { + return SafeCall(() => + { + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + var reader = _ignite.Marshaller.StartUnmarshal(stream); + + var func = reader.ReadObjectEx(); + + ResourceProcessor.Inject(func, _ignite); + + var affBase = func as AffinityFunctionBase; + + if (affBase != null) + affBase.SetBaseFunction(new PlatformAffinityFunction( + _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller)); + + return _handleRegistry.Allocate(func); + } + }); + } + + private int AffinityFunctionPartition(void* target, long ptr, long memPtr) + { + return SafeCall(() => + { + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + var key = _ignite.Marshaller.Unmarshal(stream); + + return _handleRegistry.Get(ptr, true).GetPartition(key); + } + }); + } + + private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr) + { + SafeCall(() => + { + using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream()) + { + var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream)); + var func = _handleRegistry.Get(ptr, true); + var parts = func.AssignPartitions(ctx); + + if (parts == null) + throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null"); + + using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) + { + var writer = _ignite.Marshaller.StartMarshal(outStream); + + var partCnt = 0; + writer.WriteInt(partCnt); // reserve size + + foreach (var part in parts) + { + if (part == null) + throw new IgniteException(func.GetType() + + ".AssignPartitions() returned invalid partition: null"); + + partCnt++; + + var nodeCnt = 0; + var cntPos = outStream.Position; + writer.WriteInt(nodeCnt); // reserve size + + foreach (var node in part) + { + nodeCnt++; + writer.WriteGuid(node.Id); + } + + var endPos = outStream.Position; + outStream.Seek(cntPos, SeekOrigin.Begin); + outStream.WriteInt(nodeCnt); + outStream.Seek(endPos, SeekOrigin.Begin); + } + + outStream.SynchronizeOutput(); + outStream.Seek(0, SeekOrigin.Begin); + writer.WriteInt(partCnt); + } + } + }); + } + + private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr) + { + SafeCall(() => + { + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + var nodeId = _ignite.Marshaller.Unmarshal(stream); + + _handleRegistry.Get(ptr, true).RemoveNode(nodeId); + } + }); + } + + private void AffinityFunctionDestroy(void* target, long ptr) + { + SafeCall(() => + { + _handleRegistry.Release(ptr); + }); + } + + #endregion + #region HELPERS [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]