Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 82C5C1840C for ; Wed, 9 Mar 2016 14:17:53 +0000 (UTC) Received: (qmail 28636 invoked by uid 500); 9 Mar 2016 14:17:52 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 28493 invoked by uid 500); 9 Mar 2016 14:17:49 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 28431 invoked by uid 99); 9 Mar 2016 14:17:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Mar 2016 14:17:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 608DADFC56; Wed, 9 Mar 2016 14:17:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 09 Mar 2016 14:17:48 -0000 Message-Id: <3b1c2cee4025498ca46a527c01e64d02@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/6] ignite git commit: IGNITE-2702: .NET: Implemented compact footers optimization for binary serialization. This closes #523. http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs new file mode 100644 index 0000000..da86c07 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryObjectSchemaSerializer.cs @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + + /// + /// Schema reader/writer. + /// + internal static class BinaryObjectSchemaSerializer + { + /// + /// Converts schema fields to dictionary. + /// + /// The fields. + /// Fields as dictionary. + public static Dictionary ToDictionary(this BinaryObjectSchemaField[] fields) + { + if (fields == null) + return null; + + var res = new Dictionary(fields.Length); + + foreach (var field in fields) + res[field.Id] = field.Offset; + + return res; + } + + /// + /// Reads the schema according to this header data. + /// + /// The stream. + /// The position. + /// The header. + /// The schema. + /// The marshaller. + /// + /// Schema. + /// + public static BinaryObjectSchemaField[] ReadSchema(IBinaryStream stream, int position, BinaryObjectHeader hdr, + BinaryObjectSchema schema, Marshaller marsh) + { + Debug.Assert(stream != null); + Debug.Assert(schema != null); + Debug.Assert(marsh != null); + + return ReadSchema(stream, position, hdr, () => GetFieldIds(hdr, schema, marsh)); + } + + /// + /// Reads the schema according to this header data. + /// + /// The stream. + /// The position. + /// The header. + /// The field ids function. + /// + /// Schema. + /// + public static BinaryObjectSchemaField[] ReadSchema(IBinaryStream stream, int position, BinaryObjectHeader hdr, + Func fieldIdsFunc) + { + Debug.Assert(stream != null); + Debug.Assert(fieldIdsFunc != null); + + var schemaSize = hdr.SchemaFieldCount; + + if (schemaSize == 0) + return null; + + stream.Seek(position + hdr.SchemaOffset, SeekOrigin.Begin); + + var res = new BinaryObjectSchemaField[schemaSize]; + + var offsetSize = hdr.SchemaFieldOffsetSize; + + if (hdr.IsCompactFooter) + { + var fieldIds = fieldIdsFunc(); + + Debug.Assert(fieldIds.Length == schemaSize); + + if (offsetSize == 1) + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadByte()); + + } + else if (offsetSize == 2) + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadShort()); + } + else + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(fieldIds[i], stream.ReadInt()); + } + } + else + { + if (offsetSize == 1) + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadByte()); + } + else if (offsetSize == 2) + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadShort()); + } + else + { + for (var i = 0; i < schemaSize; i++) + res[i] = new BinaryObjectSchemaField(stream.ReadInt(), stream.ReadInt()); + } + } + + return res; + } + + /// + /// Writes an array of fields to a stream. + /// + /// Fields. + /// Stream. + /// Offset in the array. + /// Field count to write. + /// Compact mode without field ids. + /// + /// Flags according to offset sizes: , + /// , or 0. + /// + public static unsafe BinaryObjectHeader.Flag WriteSchema(BinaryObjectSchemaField[] fields, IBinaryStream stream, int offset, + int count, bool compact) + { + Debug.Assert(fields != null); + Debug.Assert(stream != null); + Debug.Assert(count > 0); + Debug.Assert(offset >= 0); + Debug.Assert(offset < fields.Length); + + unchecked + { + // Last field is the farthest in the stream + var maxFieldOffset = fields[offset + count - 1].Offset; + + if (compact) + { + if (maxFieldOffset <= byte.MaxValue) + { + for (int i = offset; i < count + offset; i++) + stream.WriteByte((byte)fields[i].Offset); + + return BinaryObjectHeader.Flag.OffsetOneByte; + } + + if (maxFieldOffset <= ushort.MaxValue) + { + for (int i = offset; i < count + offset; i++) + stream.WriteShort((short)fields[i].Offset); + + return BinaryObjectHeader.Flag.OffsetTwoBytes; + } + + for (int i = offset; i < count + offset; i++) + stream.WriteInt(fields[i].Offset); + } + else + { + if (maxFieldOffset <= byte.MaxValue) + { + for (int i = offset; i < count + offset; i++) + { + var field = fields[i]; + + stream.WriteInt(field.Id); + stream.WriteByte((byte)field.Offset); + } + + return BinaryObjectHeader.Flag.OffsetOneByte; + } + + if (maxFieldOffset <= ushort.MaxValue) + { + for (int i = offset; i < count + offset; i++) + { + var field = fields[i]; + + stream.WriteInt(field.Id); + + stream.WriteShort((short)field.Offset); + } + + return BinaryObjectHeader.Flag.OffsetTwoBytes; + } + + if (BitConverter.IsLittleEndian) + { + fixed (BinaryObjectSchemaField* ptr = &fields[offset]) + { + stream.Write((byte*)ptr, count / BinaryObjectSchemaField.Size); + } + } + else + { + for (int i = offset; i < count + offset; i++) + { + var field = fields[i]; + + stream.WriteInt(field.Id); + stream.WriteInt(field.Offset); + } + } + } + + + return BinaryObjectHeader.Flag.None; + } + } + + /// + /// Gets the field ids. + /// + private static int[] GetFieldIds(BinaryObjectHeader hdr, BinaryObjectSchema schema, Marshaller marsh) + { + var fieldIds = schema.Get(hdr.SchemaId); + + if (fieldIds == null) + { + if (marsh.Ignite != null) + fieldIds = marsh.Ignite.ClusterGroup.GetSchema(hdr.TypeId, hdr.SchemaId); + + if (fieldIds == null) + throw new BinaryObjectException("Cannot find schema for object with compact footer [" + + "typeId=" + hdr.TypeId + ", schemaId=" + hdr.SchemaId + ']'); + } + return fieldIds; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs index 16aae93..21c1642 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs @@ -718,16 +718,7 @@ namespace Apache.Ignite.Core.Impl.Binary // Set new frame. _curHdr = hdr; _curPos = pos; - - _curSchema = desc.Schema.Get(hdr.SchemaId); - - if (_curSchema == null) - { - _curSchema = ReadSchema(); - - desc.Schema.Add(hdr.SchemaId, _curSchema); - } - + SetCurSchema(desc); _curStruct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure); _curRaw = false; @@ -790,10 +781,40 @@ namespace Apache.Ignite.Core.Impl.Binary } /// + /// Sets the current schema. + /// + private void SetCurSchema(IBinaryTypeDescriptor desc) + { + if (_curHdr.HasSchema) + { + _curSchema = desc.Schema.Get(_curHdr.SchemaId); + + if (_curSchema == null) + { + _curSchema = ReadSchema(); + + desc.Schema.Add(_curHdr.SchemaId, _curSchema); + } + } + } + + /// /// Reads the schema. /// private int[] ReadSchema() { + if (_curHdr.IsCompactFooter) + { + // Get schema from Java + var schema = Marshaller.Ignite.ClusterGroup.GetSchema(_curHdr.TypeId, _curHdr.SchemaId); + + if (schema == null) + throw new BinaryObjectException("Cannot find schema for object with compact footer [" + + "typeId=" + _curHdr.TypeId + ", schemaId=" + _curHdr.SchemaId + ']'); + + return schema; + } + Stream.Seek(_curPos + _curHdr.SchemaOffset, SeekOrigin.Begin); var count = _curHdr.SchemaFieldCount; @@ -929,9 +950,10 @@ namespace Apache.Ignite.Core.Impl.Binary if (_curSchema == null || actionId >= _curSchema.Length || fieldId != _curSchema[actionId]) { - _curSchema = null; // read order is different, ignore schema for future reads + _curSchemaMap = _curSchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _curPos, _curHdr, + () => _curSchema).ToDictionary(); - _curSchemaMap = _curSchemaMap ?? _curHdr.ReadSchemaAsDictionary(Stream, _curPos); + _curSchema = null; // read order is different, ignore schema for future reads int pos; http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs index 5b1273e..47bc2b6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs @@ -1137,6 +1137,9 @@ namespace Apache.Ignite.Core.Impl.Binary ? BinaryObjectHeader.Flag.UserType : BinaryObjectHeader.Flag.None; + if (Marshaller.CompactFooter && desc.UserType) + flags |= BinaryObjectHeader.Flag.CompactFooter; + var hasSchema = _schema.WriteSchema(_stream, schemaIdx, out schemaId, ref flags); if (hasSchema) @@ -1146,6 +1149,10 @@ namespace Apache.Ignite.Core.Impl.Binary // Calculate and write header. if (_curRawPos > 0) _stream.WriteInt(_curRawPos - pos); // raw offset is in the last 4 bytes + + // Update schema in type descriptor + if (desc.Schema.Get(schemaId) == null) + desc.Schema.Add(schemaId, _schema.GetSchema(schemaIdx)); } else schemaOffset = BinaryObjectHeader.Size; @@ -1451,18 +1458,7 @@ namespace Apache.Ignite.Core.Impl.Binary BinaryType meta; if (_metas.TryGetValue(desc.TypeId, out meta)) - { - if (fields != null) - { - IDictionary existingFields = meta.GetFieldsMap(); - - foreach (KeyValuePair field in fields) - { - if (!existingFields.ContainsKey(field.Key)) - existingFields[field.Key] = field.Value; - } - } - } + meta.UpdateFields(fields); else _metas[desc.TypeId] = new BinaryType(desc, fields); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs index 538fbcf..1a01f2c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs @@ -66,6 +66,8 @@ namespace Apache.Ignite.Core.Impl.Binary if (cfg == null) cfg = new BinaryConfiguration(); + CompactFooter = cfg.CompactFooter; + if (cfg.TypeConfigurations == null) cfg.TypeConfigurations = new List(); @@ -107,6 +109,11 @@ namespace Apache.Ignite.Core.Impl.Binary public Ignite Ignite { get; set; } /// + /// Gets the compact footer flag. + /// + public bool CompactFooter { get; set; } + + /// /// Marshal object. /// /// Value. @@ -281,15 +288,14 @@ namespace Apache.Ignite.Core.Impl.Binary /// Puts the binary type metadata to Ignite. /// /// Descriptor. - /// Fields. - public void PutBinaryType(IBinaryTypeDescriptor desc, IDictionary fields = null) + public void PutBinaryType(IBinaryTypeDescriptor desc) { Debug.Assert(desc != null); GetBinaryTypeHandler(desc); // ensure that handler exists if (Ignite != null) - Ignite.PutBinaryTypes(new[] {new BinaryType(desc, fields)}); + Ignite.PutBinaryTypes(new[] {new BinaryType(desc)}); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs index 28dfb1a..cb0d3cd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata { using System.Collections.Generic; + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using Apache.Ignite.Core.Binary; @@ -54,6 +55,9 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata /** Aff key field name. */ private readonly string _affinityKeyFieldName; + /** Type descriptor. */ + private readonly IBinaryTypeDescriptor _descriptor; + /// /// Initializes the class. /// @@ -129,7 +133,7 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata public BinaryType(IBinaryTypeDescriptor desc, IDictionary fields = null) : this (desc.TypeId, desc.TypeName, fields, desc.AffinityKeyFieldName, desc.IsEnum) { - // No-op. + _descriptor = desc; } /// @@ -211,6 +215,14 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata } /// + /// Gets the descriptor. + /// + public IBinaryTypeDescriptor Descriptor + { + get { return _descriptor; } + } + + /// /// Gets fields map. /// /// Fields map. @@ -218,5 +230,19 @@ namespace Apache.Ignite.Core.Impl.Binary.Metadata { return _fields ?? EmptyDict; } + + /// + /// Updates the fields. + /// + public void UpdateFields(IDictionary fields) + { + if (fields == null || fields.Count == 0) + return; + + Debug.Assert(_fields != null); + + foreach (var field in fields) + _fields[field.Key] = field.Value; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs index fc673a6..e6c0005 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs @@ -96,6 +96,9 @@ namespace Apache.Ignite.Core.Impl.Cluster /** */ private const int OpTopology = 14; + /** */ + private const int OpSchema = 15; + /** Initial Ignite instance. */ private readonly Ignite _ignite; @@ -570,5 +573,17 @@ namespace Apache.Ignite.Core.Impl.Cluster return res; }); } + + /// + /// Gets the schema. + /// + public int[] GetSchema(int typeId, int schemaId) + { + return DoOutInOp(OpSchema, writer => + { + writer.WriteInt(typeId); + writer.WriteInt(schemaId); + }); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index 0271fa2..1735fb8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -15,6 +15,7 @@ * limitations under the License. */ +#pragma warning disable 618 // SpringConfigUrl namespace Apache.Ignite.Core.Impl { using System; @@ -135,6 +136,24 @@ namespace Apache.Ignite.Core.Impl // Set reconnected task to completed state for convenience. _clientReconnectTaskCompletionSource.SetResult(false); + + SetCompactFooter(); + } + + /// + /// Sets the compact footer setting. + /// + private void SetCompactFooter() + { + if (!string.IsNullOrEmpty(_cfg.SpringConfigUrl)) + { + // If there is a Spring config, use setting from Spring, + // since we ignore .NET config in legacy mode. + var cfg0 = GetConfiguration().BinaryConfiguration; + + if (cfg0 != null) + _marsh.CompactFooter = cfg0.CompactFooter; + } } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/c6c93892/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index c4258bd..26b6033 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -604,32 +604,53 @@ namespace Apache.Ignite.Core.Impl { DoOutOp(OpMeta, stream => { - BinaryWriter metaWriter = _marsh.StartMarshal(stream); + BinaryWriter w = _marsh.StartMarshal(stream); - metaWriter.WriteInt(types.Count); + w.WriteInt(types.Count); foreach (var meta in types) { - BinaryType meta0 = meta; + w.WriteInt(meta.TypeId); + w.WriteString(meta.TypeName); + w.WriteString(meta.AffinityKeyFieldName); - metaWriter.WriteInt(meta0.TypeId); - metaWriter.WriteString(meta0.TypeName); - metaWriter.WriteString(meta0.AffinityKeyFieldName); + IDictionary fields = meta.GetFieldsMap(); - IDictionary fields = meta0.GetFieldsMap(); - - metaWriter.WriteInt(fields.Count); + w.WriteInt(fields.Count); foreach (var field in fields) { - metaWriter.WriteString(field.Key); - metaWriter.WriteInt(field.Value); + w.WriteString(field.Key); + w.WriteInt(field.Value); + } + + w.WriteBoolean(meta.IsEnum); + + // Send schemas + var desc = meta.Descriptor; + Debug.Assert(desc != null); + + var count = 0; + var countPos = stream.Position; + w.WriteInt(0); // Reserve for count + + foreach (var schema in desc.Schema.GetAll()) + { + w.WriteInt(schema.Key); + + var ids = schema.Value; + w.WriteInt(ids.Length); + + foreach (var id in ids) + w.WriteInt(id); + + count++; } - metaWriter.WriteBoolean(meta.IsEnum); + stream.WriteInt(countPos, count); } - _marsh.FinishMarshal(metaWriter); + _marsh.FinishMarshal(w); }); _marsh.OnBinaryTypesSent(types);