Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8425F17F8D for ; Fri, 30 Oct 2015 06:21:59 +0000 (UTC) Received: (qmail 98935 invoked by uid 500); 30 Oct 2015 06:21:59 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 98871 invoked by uid 500); 30 Oct 2015 06:21:59 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98781 invoked by uid 99); 30 Oct 2015 06:21:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Oct 2015 06:21:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 37E95DFB8D; Fri, 30 Oct 2015 06:21:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 30 Oct 2015 06:22:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] ignite git commit: IGNITE-1770: DotNet part. IGNITE-1770: DotNet part. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebb59902 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebb59902 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebb59902 Branch: refs/heads/ignite-1803 Commit: ebb59902af1cd71e29177a464cffb2fac66b1386 Parents: 42da3fa Author: Pavel Tupitsyn Authored: Thu Oct 29 18:56:42 2015 +0300 Committer: vozerov-gridgain Committed: Thu Oct 29 18:56:42 2015 +0300 ---------------------------------------------------------------------- .../Cache/Store/CacheTestStore.cs | 55 ++- .../Apache.Ignite.Core.csproj | 4 + .../Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs | 54 +++ .../Impl/Common/ResizeableArray.cs | 56 +++ .../Impl/Memory/PlatformMemoryStream.cs | 35 +- .../Impl/Portable/Io/IPortableStream.cs | 8 +- .../Impl/Portable/Io/PortableAbstractStream.cs | 72 +--- .../Impl/Portable/Io/PortableHeapStream.cs | 99 +++--- .../Impl/Portable/PortableBuilderImpl.cs | 145 ++++---- .../Impl/Portable/PortableMarshaller.cs | 2 +- .../Impl/Portable/PortableObjectHeader.cs | 343 +++++++++++++++++++ .../Impl/Portable/PortableObjectSchemaField.cs | 110 ++++++ .../Impl/Portable/PortableReaderImpl.cs | 127 +++---- .../Impl/Portable/PortableUserObject.cs | 82 ++--- .../Impl/Portable/PortableUtils.cs | 103 ++---- .../Impl/Portable/PortableWriterImpl.cs | 164 +++------ .../Impl/Portable/PortablesImpl.cs | 16 +- 17 files changed, 922 insertions(+), 553 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs index d5e2e5f..9c381cb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -61,35 +61,32 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void LoadCache(Action act, params object[] args) { - throw new Exception("Cache load failed."); -// -// -// Debug.Assert(_grid != null); -// -// if (LoadMultithreaded) -// { -// int cnt = 0; -// -// TestUtils.RunMultiThreaded(() => { -// int i; -// -// while ((i = Interlocked.Increment(ref cnt) - 1) < 1000) -// act(i, "val_" + i); -// }, 8); -// } -// else -// { -// int start = (int)args[0]; -// int cnt = (int)args[1]; -// -// for (int i = start; i < start + cnt; i++) -// { -// if (LoadObjects) -// act(new Key(i), new Value(i)); -// else -// act(i, "val_" + i); -// } -// } + Debug.Assert(_grid != null); + + if (LoadMultithreaded) + { + int cnt = 0; + + TestUtils.RunMultiThreaded(() => { + int i; + + while ((i = Interlocked.Increment(ref cnt) - 1) < 1000) + act(i, "val_" + i); + }, 8); + } + else + { + int start = (int)args[0]; + int cnt = (int)args[1]; + + for (int i = start; i < start + cnt; i++) + { + if (LoadObjects) + act(new Key(i), new Value(i)); + else + act(i, "val_" + i); + } + } } public object Load(object key) http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index e4450b6..9f2c6bd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -171,6 +171,7 @@ + @@ -179,6 +180,7 @@ + @@ -253,6 +255,8 @@ + + http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs new file mode 100644 index 0000000..26bbe7c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Fnv1Hash.cs @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + /// + /// Fowler–Noll–Vo hash function. + /// + internal static class Fnv1Hash + { + /** Basis. */ + public const int Basis = unchecked((int) 0x811C9DC5); + + /** Prime. */ + public const int Prime = 0x01000193; + + /// + /// Updates the hashcode with next int. + /// + /// The current. + /// The next. + /// Updated hashcode. + public static int Update(int current, int next) + { + current = current ^ (next & 0xFF); + current = current * Prime; + + current = current ^ ((next >> 8) & 0xFF); + current = current * Prime; + + current = current ^ ((next >> 16) & 0xFF); + current = current * Prime; + + current = current ^ ((next >> 24) & 0xFF); + current = current * Prime; + + return current; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs new file mode 100644 index 0000000..3555dc5 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ResizeableArray.cs @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System.Collections.Generic; + + /// + /// Simple append-only alternative which exposes internal array. + /// + internal class ResizeableArray + { + /** Array. */ + private T[] _arr; + + /** Items count. */ + private int _count; + + public ResizeableArray(int capacity) + { + _arr = new T[capacity]; + } + + public T[] Array + { + get { return _arr; } + } + + public int Count + { + get { return _count; } + } + + public void Add(T element) + { + if (_count == _arr.Length) + System.Array.Resize(ref _arr, _arr.Length*2); + + _arr[_count++] = element; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs index b7ea4d6..44766c2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Memory/PlatformMemoryStream.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Impl.Memory using System.Diagnostics.CodeAnalysis; using System.IO; using System.Text; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable.IO; /// @@ -92,6 +93,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public void WriteByteArray(byte[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (byte* val0 = val) { CopyFromAndShift(val0, val.Length); @@ -107,6 +110,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public void WriteBoolArray(bool[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (bool* val0 = val) { CopyFromAndShift((byte*)val0, val.Length); @@ -124,6 +129,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteShortArray(short[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (short* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift2); @@ -141,6 +148,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteCharArray(char[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (char* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift2); @@ -167,6 +176,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteIntArray(int[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (int* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift4); @@ -184,6 +195,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteLongArray(long[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (long* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift8); @@ -201,6 +214,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteFloatArray(float[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (float* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift4); @@ -218,6 +233,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public virtual void WriteDoubleArray(double[] val) { + IgniteArgumentCheck.NotNull(val, "val"); + fixed (double* val0 = val) { CopyFromAndShift((byte*)val0, val.Length << Shift8); @@ -227,6 +244,9 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding) { + IgniteArgumentCheck.NotNull(charCnt, "charCnt"); + IgniteArgumentCheck.NotNull(byteCnt, "byteCnt"); + int curPos = EnsureWriteCapacityAndShift(byteCnt); return encoding.GetBytes(chars, charCnt, _data + curPos, byteCnt); @@ -235,6 +255,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public void Write(byte[] src, int off, int cnt) { + IgniteArgumentCheck.NotNull(src, "src"); + fixed (byte* src0 = src) { CopyFromAndShift(src0 + off, cnt); @@ -260,7 +282,6 @@ namespace Apache.Ignite.Core.Impl.Memory } /** */ - public byte[] ReadByteArray(int cnt) { int curPos = EnsureReadCapacityAndShift(cnt); @@ -423,6 +444,8 @@ namespace Apache.Ignite.Core.Impl.Memory /** */ public void Read(byte[] dest, int off, int cnt) { + IgniteArgumentCheck.NotNull(dest, "dest"); + fixed (byte* dest0 = dest) { Read(dest0 + off, cnt); @@ -633,9 +656,9 @@ namespace Apache.Ignite.Core.Impl.Memory } /** */ - public int Remaining() + public int Remaining { - return _len - _pos; + get { return _len - _pos; } } /** */ @@ -675,13 +698,13 @@ namespace Apache.Ignite.Core.Impl.Memory #region ARRAYS /** */ - public byte[] Array() + public byte[] GetArray() { - return ArrayCopy(); + return GetArrayCopy(); } /** */ - public byte[] ArrayCopy() + public byte[] GetArrayCopy() { byte[] res = new byte[_mem.Length]; http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs index 73d5a51..80087e4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/IPortableStream.cs @@ -289,20 +289,20 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// /// Gets remaining bytes in the stream. /// - /// Remaining bytes. - int Remaining(); + /// Remaining bytes. + int Remaining { get; } /// /// Gets underlying array, avoiding copying if possible. /// /// Underlying array. - byte[] Array(); + byte[] GetArray(); /// /// Gets underlying data in a new array. /// /// New array with data. - byte[] ArrayCopy(); + byte[] GetArrayCopy(); /// /// Check whether array passed as argument is the same as the stream hosts. http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs index f84b5a3..0cd3342 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs @@ -18,28 +18,16 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { using System; - using System.Diagnostics.CodeAnalysis; using System.IO; - using System.Reflection; using System.Text; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Memory; /// /// Base class for managed and unmanaged data streams. /// internal unsafe abstract class PortableAbstractStream : IPortableStream { - /// - /// Array copy delegate. - /// - delegate void MemCopy(byte* a1, byte* a2, int len); - - /** memcpy function handle. */ - private static readonly MemCopy Memcpy; - - /** Whether src and dest arguments are inverted. */ - [SuppressMessage("Microsoft.Performance", "CA1802:UseLiteralsWhereAppropriate")] - private static readonly bool MemcpyInverted; - /** Byte: zero. */ private const byte ByteZero = 0; @@ -56,37 +44,6 @@ namespace Apache.Ignite.Core.Impl.Portable.IO private bool _disposed; /// - /// Static initializer. - /// - [SuppressMessage("Microsoft.Design", "CA1065:DoNotRaiseExceptionsInUnexpectedLocations")] - [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")] - static PortableAbstractStream() - { - Type type = typeof(Buffer); - - const BindingFlags flags = BindingFlags.Static | BindingFlags.NonPublic; - Type[] paramTypes = { typeof(byte*), typeof(byte*), typeof(int) }; - - // Assume .Net 4.5. - MethodInfo mthd = type.GetMethod("Memcpy", flags, null, paramTypes, null); - - MemcpyInverted = true; - - if (mthd == null) - { - // Assume .Net 4.0. - mthd = type.GetMethod("memcpyimpl", flags, null, paramTypes, null); - - MemcpyInverted = false; - - if (mthd == null) - throw new InvalidOperationException("Unable to get memory copy function delegate."); - } - - Memcpy = (MemCopy)Delegate.CreateDelegate(typeof(MemCopy), mthd); - } - - /// /// Write byte. /// /// Byte value. @@ -1076,15 +1033,15 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// /// Internal read routine. /// + /// Source /// Destination. /// Count. - /// Data (source). /// Amount of bytes written. - protected void ReadInternal(byte* dest, int cnt, byte* data) + protected void ReadInternal(byte* src, byte* dest, int cnt) { - int cnt0 = Math.Min(Remaining(), cnt); + int cnt0 = Math.Min(Remaining, cnt); - CopyMemory(data + Pos, dest, cnt0); + CopyMemory(src + Pos, dest, cnt0); ShiftRead(cnt0); } @@ -1100,10 +1057,10 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// /// Gets remaining bytes in the stream. /// - /// - /// Remaining bytes. - /// - public abstract int Remaining(); + /// + /// Remaining bytes. + /// + public abstract int Remaining { get; } /// /// Gets underlying array, avoiding copying if possible. @@ -1111,7 +1068,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// /// Underlying array. /// - public abstract byte[] Array(); + public abstract byte[] GetArray(); /// /// Gets underlying data in a new array. @@ -1119,7 +1076,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// /// New array with data. /// - public abstract byte[] ArrayCopy(); + public abstract byte[] GetArrayCopy(); /// /// Check whether array passed as argument is the same as the stream hosts. @@ -1291,10 +1248,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// Length. private static void CopyMemory(byte* src, byte* dest, int len) { - if (MemcpyInverted) - Memcpy.Invoke(dest, src, len); - else - Memcpy.Invoke(src, dest, len); + PlatformMemoryUtils.CopyMemory(src, dest, len); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs index 690f92c..b7d001e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs @@ -18,8 +18,11 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { using System; + using System.Diagnostics; using System.IO; using System.Text; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Memory; /// /// Portable onheap stream. @@ -27,7 +30,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO internal unsafe class PortableHeapStream : PortableAbstractStream { /** Data array. */ - protected byte[] Data; + private byte[] _data; /// /// Constructor. @@ -35,7 +38,9 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// Initial capacity. public PortableHeapStream(int cap) { - Data = new byte[cap]; + Debug.Assert(cap >= 0); + + _data = new byte[cap]; } /// @@ -44,7 +49,9 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// Data array. public PortableHeapStream(byte[] data) { - Data = data; + Debug.Assert(data != null); + + _data = data; } /** */ @@ -52,7 +59,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(1); - Data[pos0] = val; + _data[pos0] = val; } /** */ @@ -60,7 +67,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(1); - return Data[pos0]; + return _data[pos0]; } /** */ @@ -68,7 +75,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(val.Length); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteByteArray0(val, data0 + pos0); } @@ -79,7 +86,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadByteArray0(cnt, data0 + pos0); } @@ -90,7 +97,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(val.Length); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteBoolArray0(val, data0 + pos0); } @@ -101,7 +108,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadBoolArray0(cnt, data0 + pos0); } @@ -112,7 +119,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(2); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteShort0(val, data0 + pos0); } @@ -123,7 +130,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(2); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadShort0(data0 + pos0); } @@ -136,7 +143,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteShortArray0(val, data0 + pos0, cnt); } @@ -149,7 +156,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadShortArray0(cnt, data0 + pos0, cnt0); } @@ -162,7 +169,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteCharArray0(val, data0 + pos0, cnt); } @@ -175,7 +182,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadCharArray0(cnt, data0 + pos0, cnt0); } @@ -186,7 +193,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(4); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteInt0(val, data0 + pos0); } @@ -197,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { EnsureWriteCapacity(writePos + 4); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteInt0(val, data0 + writePos); } @@ -208,7 +215,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(4); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadInt0(data0 + pos0); } @@ -221,7 +228,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteIntArray0(val, data0 + pos0, cnt); } @@ -234,7 +241,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadIntArray0(cnt, data0 + pos0, cnt0); } @@ -247,7 +254,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteFloatArray0(val, data0 + pos0, cnt); } @@ -260,7 +267,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadFloatArray0(cnt, data0 + pos0, cnt0); } @@ -271,7 +278,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureWriteCapacityAndShift(8); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteLong0(val, data0 + pos0); } @@ -282,7 +289,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { int pos0 = EnsureReadCapacityAndShift(8); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadLong0(data0 + pos0); } @@ -295,7 +302,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteLongArray0(val, data0 + pos0, cnt); } @@ -308,7 +315,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadLongArray0(cnt, data0 + pos0, cnt0); } @@ -321,7 +328,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureWriteCapacityAndShift(cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteDoubleArray0(val, data0 + pos0, cnt); } @@ -334,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int pos0 = EnsureReadCapacityAndShift(cnt0); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { return ReadDoubleArray0(cnt, data0 + pos0, cnt0); } @@ -347,7 +354,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO int written; - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { written = WriteString0(chars, charCnt, byteCnt, encoding, data0 + pos0); } @@ -360,7 +367,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO { EnsureWriteCapacity(Pos + cnt); - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { WriteInternal(src, cnt, data0); } @@ -371,30 +378,30 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /** */ public override void Read(byte* dest, int cnt) { - fixed (byte* data0 = Data) + fixed (byte* data0 = _data) { - ReadInternal(dest, cnt, data0); + ReadInternal(data0, dest, cnt); } } /** */ - public override int Remaining() + public override int Remaining { - return Data.Length - Pos; + get { return _data.Length - Pos; } } /** */ - public override byte[] Array() + public override byte[] GetArray() { - return Data; + return _data; } /** */ - public override byte[] ArrayCopy() + public override byte[] GetArrayCopy() { byte[] copy = new byte[Pos]; - Buffer.BlockCopy(Data, 0, copy, 0, Pos); + Buffer.BlockCopy(_data, 0, copy, 0, Pos); return copy; } @@ -402,7 +409,7 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /** */ public override bool IsSameArray(byte[] arr) { - return Data == arr; + return _data == arr; } /** */ @@ -416,32 +423,32 @@ namespace Apache.Ignite.Core.Impl.Portable.IO /// internal byte[] InternalArray { - get { return Data; } + get { return _data; } } /** */ protected override void EnsureWriteCapacity(int cnt) { - if (cnt > Data.Length) + if (cnt > _data.Length) { - int newCap = Capacity(Data.Length, cnt); + int newCap = Capacity(_data.Length, cnt); byte[] data0 = new byte[newCap]; // Copy the whole initial array length here because it can be changed // from Java without position adjusting. - Buffer.BlockCopy(Data, 0, data0, 0, Data.Length); + Buffer.BlockCopy(_data, 0, data0, 0, _data.Length); - Data = data0; + _data = data0; } } /** */ protected override void EnsureReadCapacity(int cnt) { - if (Data.Length - Pos < cnt) + if (_data.Length - Pos < cnt) throw new EndOfStreamException("Not enough data in stream [expected=" + cnt + - ", remaining=" + (Data.Length - Pos) + ']'); + ", remaining=" + (_data.Length - Pos) + ']'); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs index 9767037..08a1d00 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Portable using System.Diagnostics; using System.IO; using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable.IO; using Apache.Ignite.Core.Impl.Portable.Metadata; using Apache.Ignite.Core.Portable; @@ -357,9 +358,9 @@ namespace Apache.Ignite.Core.Impl.Portable inStream.Seek(_obj.Offset, SeekOrigin.Begin); // Assume that resulting length will be no less than header + [fields_cnt] * 12; - int len = PortableUtils.FullHdrLen + (_vals == null ? 0 : _vals.Count * 12); + int estimatedCapacity = PortableObjectHeader.Size + (_vals == null ? 0 : _vals.Count*12); - PortableHeapStream outStream = new PortableHeapStream(len); + PortableHeapStream outStream = new PortableHeapStream(estimatedCapacity); PortableWriterImpl writer = _portables.Marshaller.StartMarshal(outStream); @@ -377,8 +378,8 @@ namespace Apache.Ignite.Core.Impl.Portable _portables.Marshaller.FinishMarshal(writer); // Create portable object once metadata is processed. - return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0, - _desc.TypeId, _hashCode); + return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0, + PortableObjectHeader.Read(outStream, 0)); } finally { @@ -568,7 +569,7 @@ namespace Apache.Ignite.Core.Impl.Portable /// New hash. /// Values to be replaced. /// Mutated object. - private void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream, + private unsafe void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream, bool changeHash, int hash, IDictionary vals) { int inStartPos = inStream.Position; @@ -605,13 +606,9 @@ namespace Apache.Ignite.Core.Impl.Portable } else if (inHdr == PortableUtils.HdrFull) { - PortableUtils.ValidateProtocolVersion(inStream); - - byte inUsrFlag = inStream.ReadByte(); - int inTypeId = inStream.ReadInt(); - int inHash = inStream.ReadInt(); - int inLen = inStream.ReadInt(); - int inRawOff = inStream.ReadInt(); + var inHeader = PortableObjectHeader.Read(inStream, inStartPos); + + PortableUtils.ValidateProtocolVersion(inHeader.Version); int hndPos; @@ -620,104 +617,106 @@ namespace Apache.Ignite.Core.Impl.Portable // Object could be cached in parent builder. PortableBuilderField cachedVal; - if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal)) { + if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal)) + { WriteField(ctx, cachedVal); } else { // New object, write in full form. - outStream.WriteByte(PortableUtils.HdrFull); - outStream.WriteByte(PortableUtils.ProtoVer); - outStream.WriteByte(inUsrFlag); - outStream.WriteInt(inTypeId); - outStream.WriteInt(changeHash ? hash : inHash); + var inSchema = inHeader.ReadSchema(inStream, inStartPos); - // Skip length and raw offset as they are not known at this point. - outStream.Seek(8, SeekOrigin.Current); + var outSchemaLen = vals.Count + (inSchema == null ? 0 : inSchema.Length); + var outSchema = outSchemaLen > 0 + ? new ResizeableArray(outSchemaLen) + : null; - // Write regular fields. - while (inStream.Position < inStartPos + inRawOff) + // Skip header as it is not known at this point. + outStream.Seek(PortableObjectHeader.Size, SeekOrigin.Current); + + if (inSchema != null) { - int inFieldId = inStream.ReadInt(); - int inFieldLen = inStream.ReadInt(); - int inFieldDataPos = inStream.Position; + foreach (var inField in inSchema) + { + PortableBuilderField fieldVal; - PortableBuilderField fieldVal; + var fieldFound = vals.TryGetValue(inField.Id, out fieldVal); - bool fieldFound = vals.TryGetValue(inFieldId, out fieldVal); + if (fieldFound && fieldVal == PortableBuilderField.RmvMarker) + continue; - if (!fieldFound || fieldVal != PortableBuilderField.RmvMarker) - { - outStream.WriteInt(inFieldId); + // ReSharper disable once PossibleNullReferenceException (can't be null) + outSchema.Add(new PortableObjectSchemaField(inField.Id, outStream.Position - outStartPos)); - int fieldLenPos = outStream.Position; // Here we will write length later. - - outStream.Seek(4, SeekOrigin.Current); + if (!fieldFound) + fieldFound = _parent._cache != null && + _parent._cache.TryGetValue(inField.Offset + inStartPos, out fieldVal); if (fieldFound) { - // Replace field with new value. - if (fieldVal != PortableBuilderField.RmvMarker) - WriteField(ctx, fieldVal); + WriteField(ctx, fieldVal); - vals.Remove(inFieldId); + vals.Remove(inField.Id); } else { - // If field was requested earlier, then we must write tracked value - if (_parent._cache != null && _parent._cache.TryGetValue(inFieldDataPos, out fieldVal)) - WriteField(ctx, fieldVal); - else - // Field is not tracked, re-write as is. - Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); - } - - int fieldEndPos = outStream.Position; + // Field is not tracked, re-write as is. + inStream.Seek(inField.Offset + inStartPos, SeekOrigin.Begin); - outStream.Seek(fieldLenPos, SeekOrigin.Begin); - outStream.WriteInt(fieldEndPos - fieldLenPos - 4); - outStream.Seek(fieldEndPos, SeekOrigin.Begin); + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + } } - - // Position intput stream pointer after the field. - inStream.Seek(inFieldDataPos + inFieldLen, SeekOrigin.Begin); } // Write remaining new fields. foreach (var valEntry in vals) { - if (valEntry.Value != PortableBuilderField.RmvMarker) - { - outStream.WriteInt(valEntry.Key); + if (valEntry.Value == PortableBuilderField.RmvMarker) + continue; + + // ReSharper disable once PossibleNullReferenceException (can't be null) + outSchema.Add(new PortableObjectSchemaField(valEntry.Key, outStream.Position - outStartPos)); + + WriteField(ctx, valEntry.Value); + } - int fieldLenPos = outStream.Position; // Here we will write length later. + if (outSchema != null && outSchema.Count == 0) + outSchema = null; - outStream.Seek(4, SeekOrigin.Current); + // Write raw data. + int outRawOff = outStream.Position - outStartPos; - WriteField(ctx, valEntry.Value); + int inRawOff = inHeader.GetRawOffset(inStream, inStartPos); + int inRawLen = inHeader.SchemaOffset - inRawOff; - int fieldEndPos = outStream.Position; + if (inRawLen > 0) + outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inRawLen); - outStream.Seek(fieldLenPos, SeekOrigin.Begin); - outStream.WriteInt(fieldEndPos - fieldLenPos - 4); - outStream.Seek(fieldEndPos, SeekOrigin.Begin); - } + // Write schema + int outSchemaOff = outRawOff; + + if (outSchema != null) + { + outSchemaOff = outStream.Position - outStartPos; + + PortableObjectSchemaField.WriteArray(outSchema.Array, outStream, outSchema.Count); + + if (inRawLen > 0) + outStream.WriteInt(outRawOff); } - // Write raw data. - int rawPos = outStream.Position; + var outSchemaId = PortableUtils.GetSchemaId(outSchema); - outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inLen - inRawOff); + var outLen = outStream.Position - outStartPos; - // Write length and raw data offset. - int outResPos = outStream.Position; + var outHash = changeHash ? hash : inHeader.HashCode; - outStream.Seek(outStartPos + PortableUtils.OffsetLen, SeekOrigin.Begin); + var outHeader = new PortableObjectHeader(inHeader.IsUserType, inHeader.TypeId, outHash, + outLen, outSchemaId, outSchemaOff, outSchema == null); - outStream.WriteInt(outResPos - outStartPos); // Length. - outStream.WriteInt(rawPos - outStartPos); // Raw offset. + PortableObjectHeader.Write(outHeader, outStream, outStartPos); - outStream.Seek(outResPos, SeekOrigin.Begin); + outStream.Seek(outStartPos + outLen, SeekOrigin.Begin); // seek to the end of the object } } else @@ -728,7 +727,7 @@ namespace Apache.Ignite.Core.Impl.Portable } // Synchronize input stream position. - inStream.Seek(inStartPos + inLen, SeekOrigin.Begin); + inStream.Seek(inStartPos + inHeader.Length, SeekOrigin.Begin); } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs index a8d7058..5ea7a55 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs @@ -119,7 +119,7 @@ namespace Apache.Ignite.Core.Impl.Portable Marshal(val, stream); - return stream.ArrayCopy(); + return stream.GetArrayCopy(); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs new file mode 100644 index 0000000..b3768a0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectHeader.cs @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Runtime.InteropServices; + using Apache.Ignite.Core.Impl.Portable.IO; + + /// + /// Portable object header structure. + /// + [StructLayout(LayoutKind.Sequential)] + internal struct PortableObjectHeader : IEquatable + { + /** Size, equals to sizeof(PortableObjectHeader) */ + public const int Size = 24; + + /** User type flag */ + private const int FlagUserType = 0x1; + + /** Raw only flag */ + private const int FlagRawOnly = 0x2; + + /** Actual header layout */ + public readonly byte Header; // Header code, always 103 (HdrFull) + public readonly byte Version; // Protocol version + public readonly short Flags; // Flags + public readonly int TypeId; // Type ID + public readonly int HashCode; // Hash code + public readonly int Length; // Length, including header + public readonly int SchemaId; // Schema ID (Fnv1 of field type ids) + public readonly int SchemaOffset; // Schema offset, or raw offset when RawOnly flag is set. + + /// + /// Initializes a new instance of the struct. + /// + /// User type flag. + /// Type ID. + /// Hash code. + /// Length. + /// Schema ID. + /// Schema offset. + /// Raw flag. + public PortableObjectHeader(bool userType, int typeId, int hashCode, int length, int schemaId, int schemaOffset, bool rawOnly) + { + Header = PortableUtils.HdrFull; + Version = PortableUtils.ProtoVer; + + Debug.Assert(schemaOffset <= length); + Debug.Assert(schemaOffset >= Size); + + Flags = (short) (userType ? FlagUserType : 0); + + if (rawOnly) + Flags = (short) (Flags | FlagRawOnly); + + TypeId = typeId; + HashCode = hashCode; + Length = length; + SchemaId = schemaId; + SchemaOffset = schemaOffset; + } + + /// + /// Initializes a new instance of the struct from specified stream. + /// + /// The stream. + private PortableObjectHeader(IPortableStream stream) + { + Header = stream.ReadByte(); + Version = stream.ReadByte(); + Flags = stream.ReadShort(); + Length = stream.ReadInt(); + TypeId = stream.ReadInt(); + HashCode = stream.ReadInt(); + SchemaId = stream.ReadInt(); + SchemaOffset = stream.ReadInt(); + } + + /// + /// Writes this instance to the specified stream. + /// + /// The stream. + private void Write(IPortableStream stream) + { + stream.WriteByte(Header); + stream.WriteByte(Version); + stream.WriteShort(Flags); + stream.WriteInt(Length); + stream.WriteInt(TypeId); + stream.WriteInt(HashCode); + stream.WriteInt(SchemaId); + stream.WriteInt(SchemaOffset); + } + + /// + /// Gets a user type flag. + /// + public bool IsUserType + { + get { return (Flags & FlagUserType) == FlagUserType; } + } + + /// + /// Gets a raw-only flag. + /// + public bool IsRawOnly + { + get { return (Flags & FlagRawOnly) == FlagRawOnly; } + } + + /// + /// Gets a value indicating whether this instance has raw offset. + /// + public bool HasRawOffset + { + get + { + // Odd amount of records in schema => raw offset is the very last 4 bytes in object. + return !IsRawOnly && (((Length - SchemaOffset) >> 2) & 0x1) != 0x0; + } + } + + /// + /// Gets the schema field count. + /// + public int SchemaFieldCount + { + get + { + if (IsRawOnly) + return 0; + + var schemaSize = Length - SchemaOffset; + + if (HasRawOffset) + schemaSize -= 4; + + return schemaSize >> 3; // 8 == PortableObjectSchemaField.Size + } + } + + /// + /// Gets the schema end. + /// + public int GetSchemaEnd(int position) + { + var res = position + Length; + + if (HasRawOffset) + res -= 4; + + return res; + } + + /// + /// Gets the schema start. + /// + public int GetSchemaStart(int position) + { + return IsRawOnly ? GetSchemaEnd(position) : position + SchemaOffset; + } + + /// + /// Gets the raw offset of this object in specified stream. + /// + /// The stream. + /// The position. + /// Raw offset. + public int GetRawOffset(IPortableStream stream, int position) + { + Debug.Assert(stream != null); + + if (!HasRawOffset) + return SchemaOffset; + + stream.Seek(position + Length - 4, SeekOrigin.Begin); + + return stream.ReadInt(); + } + + /// + /// Reads the schema as dictionary according to this header data. + /// + /// The stream. + /// The position. + /// Schema. + public Dictionary ReadSchemaAsDictionary(IPortableStream stream, int position) + { + Debug.Assert(stream != null); + + var schemaSize = SchemaFieldCount; + + if (schemaSize == 0) + return null; + + stream.Seek(position + SchemaOffset, SeekOrigin.Begin); + + var schema = new Dictionary(schemaSize >> 3); + + for (var i = 0; i < schemaSize; i++) + schema.Add(stream.ReadInt(), stream.ReadInt()); + + return schema; + } + + /// + /// Reads the schema according to this header data. + /// + /// The stream. + /// The position. + /// Schema. + public PortableObjectSchemaField[] ReadSchema(IPortableStream stream, int position) + { + Debug.Assert(stream != null); + + var schemaSize = SchemaFieldCount; + + if (schemaSize == 0) + return null; + + stream.Seek(position + SchemaOffset, SeekOrigin.Begin); + + return PortableObjectSchemaField.ReadArray(stream, schemaSize); + } + + /// + /// Writes specified header to a stream. + /// + /// The header. + /// The stream. + /// The position. + public static unsafe void Write(PortableObjectHeader header, IPortableStream stream, int position) + { + Debug.Assert(stream != null); + Debug.Assert(position >= 0); + + stream.Seek(position, SeekOrigin.Begin); + + if (BitConverter.IsLittleEndian) + stream.Write((byte*) &header, Size); + else + header.Write(stream); + } + + /// + /// Reads an instance from stream. + /// + /// The stream. + /// The position. + /// Instance of the header. + public static unsafe PortableObjectHeader Read(IPortableStream stream, int position) + { + Debug.Assert(stream != null); + Debug.Assert(position >= 0); + + stream.Seek(position, SeekOrigin.Begin); + + if (BitConverter.IsLittleEndian) + { + var hdr = new PortableObjectHeader(); + + stream.Read((byte*) &hdr, Size); + + Debug.Assert(hdr.Version == PortableUtils.ProtoVer); + Debug.Assert(hdr.SchemaOffset <= hdr.Length); + Debug.Assert(hdr.SchemaOffset >= Size); + + return hdr; + } + + return new PortableObjectHeader(stream); + } + + /** */ + public bool Equals(PortableObjectHeader other) + { + return Header == other.Header && + Version == other.Version && + Flags == other.Flags && + TypeId == other.TypeId && + HashCode == other.HashCode && + Length == other.Length && + SchemaId == other.SchemaId && + SchemaOffset == other.SchemaOffset; + } + + /** */ + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + + return obj is PortableObjectHeader && Equals((PortableObjectHeader) obj); + } + + /** */ + public override int GetHashCode() + { + unchecked + { + var hashCode = Header.GetHashCode(); + hashCode = (hashCode*397) ^ Version.GetHashCode(); + hashCode = (hashCode*397) ^ Flags.GetHashCode(); + hashCode = (hashCode*397) ^ TypeId; + hashCode = (hashCode*397) ^ HashCode; + hashCode = (hashCode*397) ^ Length; + hashCode = (hashCode*397) ^ SchemaId; + hashCode = (hashCode*397) ^ SchemaOffset; + return hashCode; + } + } + + /** */ + public static bool operator ==(PortableObjectHeader left, PortableObjectHeader right) + { + return left.Equals(right); + } + + /** */ + public static bool operator !=(PortableObjectHeader left, PortableObjectHeader right) + { + return !left.Equals(right); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectSchemaField.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectSchemaField.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectSchemaField.cs new file mode 100644 index 0000000..5d489c3 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableObjectSchemaField.cs @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using System.Diagnostics; + using System.Runtime.InteropServices; + using Apache.Ignite.Core.Impl.Portable.IO; + + [StructLayout(LayoutKind.Sequential)] + internal struct PortableObjectSchemaField + { + /* Field ID */ + public readonly int Id; + + /** Offset. */ + public readonly int Offset; + + /** Size, equals to sizeof(PortableObjectSchemaField) */ + private const int Size = 8; + + /// + /// Initializes a new instance of the struct. + /// + /// The id. + /// The offset. + public PortableObjectSchemaField(int id, int offset) + { + Id = id; + Offset = offset; + } + + /// + /// Writes an array of fields to a stream. + /// + /// Fields. + /// Stream. + /// Field count to write. + public static unsafe void WriteArray(PortableObjectSchemaField[] fields, IPortableStream stream, int count) + { + Debug.Assert(fields != null); + Debug.Assert(stream != null); + Debug.Assert(count > 0); + + if (BitConverter.IsLittleEndian) + { + fixed (PortableObjectSchemaField* ptr = &fields[0]) + { + stream.Write((byte*) ptr, count * Size); + } + } + else + { + for (int i = 0; i < count; i++) + { + var field = fields[i]; + + stream.WriteInt(field.Id); + stream.WriteInt(field.Offset); + } + } + } + + /// + /// Reads an array of fields from a stream. + /// + /// Stream. + /// Count. + /// + public static unsafe PortableObjectSchemaField[] ReadArray(IPortableStream stream, int count) + { + Debug.Assert(stream != null); + Debug.Assert(count > 0); + + var res = new PortableObjectSchemaField[count]; + + if (BitConverter.IsLittleEndian) + { + fixed (PortableObjectSchemaField* ptr = &res[0]) + { + stream.Read((byte*) ptr, count * Size); + } + } + else + { + for (int i = 0; i < count; i++) + { + res[i] = new PortableObjectSchemaField(stream.ReadInt(), stream.ReadInt()); + } + } + + return res; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs index 422d628..a289816 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs @@ -66,6 +66,12 @@ namespace Apache.Ignite.Core.Impl.Portable /** Current type structure tracker. */ private PortableStructureTracker _curStruct; + /** */ + private int _curFooterStart; + + /** */ + private int _curFooterEnd; + /// /// Constructor. @@ -554,10 +560,10 @@ namespace Apache.Ignite.Core.Impl.Portable { var len = Stream.ReadInt(); - var portablePos = Stream.Position; + var portableBytesPos = Stream.Position; if (_mode != PortableMode.Deserialize) - return TypeCaster.Cast(ReadAsPortable(portablePos, len, doDetach)); + return TypeCaster.Cast(ReadAsPortable(portableBytesPos, len, doDetach)); Stream.Seek(len, SeekOrigin.Current); @@ -565,7 +571,7 @@ namespace Apache.Ignite.Core.Impl.Portable var retPos = Stream.Position; - Stream.Seek(portablePos + offset, SeekOrigin.Begin); + Stream.Seek(portableBytesPos + offset, SeekOrigin.Begin); _mode = PortableMode.KeepPortable; @@ -584,30 +590,28 @@ namespace Apache.Ignite.Core.Impl.Portable /// /// Reads the portable object in portable form. /// - private PortableUserObject ReadAsPortable(int dataPos, int dataLen, bool doDetach) + private PortableUserObject ReadAsPortable(int portableBytesPos, int dataLen, bool doDetach) { try { - Stream.Seek(dataLen + dataPos, SeekOrigin.Begin); + Stream.Seek(dataLen + portableBytesPos, SeekOrigin.Begin); var offs = Stream.ReadInt(); // offset inside data - var pos = dataPos + offs; + var pos = portableBytesPos + offs; - if (!doDetach) - return GetPortableUserObject(pos, pos, Stream.Array()); - - Stream.Seek(pos + PortableUtils.OffsetLen, SeekOrigin.Begin); + var hdr = PortableObjectHeader.Read(Stream, pos); - var len = Stream.ReadInt(); + if (!doDetach) + return new PortableUserObject(_marsh, Stream.GetArray(), pos, hdr); Stream.Seek(pos, SeekOrigin.Begin); - return GetPortableUserObject(pos, 0, Stream.ReadByteArray(len)); + return new PortableUserObject(_marsh, Stream.ReadByteArray(hdr.Length), 0, hdr); } finally { - Stream.Seek(dataPos + dataLen + 4, SeekOrigin.Begin); + Stream.Seek(portableBytesPos + dataLen + 4, SeekOrigin.Begin); } } @@ -617,16 +621,10 @@ namespace Apache.Ignite.Core.Impl.Portable [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "hashCode")] private T ReadFullObject(int pos) { - // Validate protocol version. - PortableUtils.ValidateProtocolVersion(Stream); + var hdr = PortableObjectHeader.Read(Stream, pos); - // Read header. - bool userType = Stream.ReadBool(); - int typeId = Stream.ReadInt(); - // ReSharper disable once UnusedVariable - int hashCode = Stream.ReadInt(); - int len = Stream.ReadInt(); - int rawOffset = Stream.ReadInt(); + // Validate protocol version. + PortableUtils.ValidateProtocolVersion(hdr.Version); try { @@ -636,7 +634,7 @@ namespace Apache.Ignite.Core.Impl.Portable if (_hnds != null && _hnds.TryGetValue(pos, out hndObj)) return (T) hndObj; - if (userType && _mode == PortableMode.ForcePortable) + if (hdr.IsUserType && _mode == PortableMode.ForcePortable) { PortableUserObject portObj; @@ -644,10 +642,10 @@ namespace Apache.Ignite.Core.Impl.Portable { Stream.Seek(pos, SeekOrigin.Begin); - portObj = GetPortableUserObject(pos, 0, Stream.ReadByteArray(len)); + portObj = new PortableUserObject(_marsh, Stream.ReadByteArray(hdr.Length), 0, hdr); } else - portObj = GetPortableUserObject(pos, pos, Stream.Array()); + portObj = new PortableUserObject(_marsh, Stream.GetArray(), pos, hdr); T obj = _builder == null ? TypeCaster.Cast(portObj) : TypeCaster.Cast(_builder.Child(portObj)); @@ -660,8 +658,8 @@ namespace Apache.Ignite.Core.Impl.Portable // Find descriptor. IPortableTypeDescriptor desc; - if (!_descs.TryGetValue(PortableUtils.TypeKey(userType, typeId), out desc)) - throw new PortableException("Unknown type ID: " + typeId); + if (!_descs.TryGetValue(PortableUtils.TypeKey(hdr.IsUserType, hdr.TypeId), out desc)) + throw new PortableException("Unknown type ID: " + hdr.TypeId); // Instantiate object. if (desc.Type == null) @@ -674,15 +672,21 @@ namespace Apache.Ignite.Core.Impl.Portable int oldRawOffset = _curRawOffset; var oldStruct = _curStruct; bool oldRaw = _curRaw; + var oldFooterStart = _curFooterStart; + var oldFooterEnd = _curFooterEnd; // Set new frame. - _curTypeId = typeId; + _curTypeId = hdr.TypeId; _curPos = pos; - _curRawOffset = rawOffset; + _curFooterEnd = hdr.GetSchemaEnd(pos); + _curFooterStart = hdr.GetSchemaStart(pos); + _curRawOffset = hdr.GetRawOffset(Stream, pos); _curStruct = new PortableStructureTracker(desc, desc.ReaderTypeStructure); _curRaw = false; // Read object. + Stream.Seek(pos + PortableObjectHeader.Size, SeekOrigin.Begin); + object obj; var sysSerializer = desc.Serializer as IPortableSystemTypeSerializer; @@ -715,6 +719,8 @@ namespace Apache.Ignite.Core.Impl.Portable _curRawOffset = oldRawOffset; _curStruct = oldStruct; _curRaw = oldRaw; + _curFooterStart = oldFooterStart; + _curFooterEnd = oldFooterEnd; // Process wrappers. We could introduce a common interface, but for only 2 if-else is faster. var wrappedSerializable = obj as SerializableObjectHolder; @@ -733,7 +739,7 @@ namespace Apache.Ignite.Core.Impl.Portable finally { // Advance stream pointer. - Stream.Seek(pos + len, SeekOrigin.Begin); + Stream.Seek(pos + hdr.Length, SeekOrigin.Begin); } } @@ -817,51 +823,21 @@ namespace Apache.Ignite.Core.Impl.Portable /// True in case the field was found and position adjusted, false otherwise. private bool SeekField(int fieldId) { - // This method is expected to be called when stream pointer is set either before - // the field or on raw data offset. - int start = _curPos + PortableUtils.FullHdrLen; - int end = _curPos + _curRawOffset; - - int initial = Stream.Position; - - int cur = initial; - - while (cur < end) - { - int id = Stream.ReadInt(); - - if (fieldId == id) - { - // Field is found, return. - Stream.Seek(4, SeekOrigin.Current); - - return true; - } - - Stream.Seek(Stream.ReadInt(), SeekOrigin.Current); - - cur = Stream.Position; - } + Stream.Seek(_curFooterStart, SeekOrigin.Begin); - Stream.Seek(start, SeekOrigin.Begin); - - cur = start; - - while (cur < initial) + while (Stream.Position < _curFooterEnd) { - int id = Stream.ReadInt(); + var id = Stream.ReadInt(); - if (fieldId == id) + if (id == fieldId) { - // Field is found, return. - Stream.Seek(4, SeekOrigin.Current); + var fieldOffset = Stream.ReadInt(); + Stream.Seek(_curPos + fieldOffset, SeekOrigin.Begin); return true; } - - Stream.Seek(Stream.ReadInt(), SeekOrigin.Current); - cur = Stream.Position; + Stream.Seek(4, SeekOrigin.Current); } return false; @@ -939,22 +915,5 @@ namespace Apache.Ignite.Core.Impl.Portable { return IsNotNullHeader(expHdr) ? readFunc(Stream) : default(T); } - - /// - /// Gets the portable user object from a byte array. - /// - /// Position in the current stream. - /// Offset in the byte array. - /// Bytes. - private PortableUserObject GetPortableUserObject(int pos, int offs, byte[] bytes) - { - Stream.Seek(pos + PortableUtils.OffsetTypeId, SeekOrigin.Begin); - - var id = Stream.ReadInt(); - - var hash = Stream.ReadInt(); - - return new PortableUserObject(_marsh, bytes, offs, id, hash); - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ebb59902/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs index c241b96..300281b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs @@ -31,6 +31,9 @@ namespace Apache.Ignite.Core.Impl.Portable /// internal class PortableUserObject : IPortableObject { + /** Cache empty dictionary. */ + private static readonly IDictionary EmptyFields = new Dictionary(); + /** Marshaller. */ private readonly PortableMarshaller _marsh; @@ -40,11 +43,8 @@ namespace Apache.Ignite.Core.Impl.Portable /** Offset in data array. */ private readonly int _offset; - /** Type ID. */ - private readonly int _typeId; - - /** Hash code. */ - private readonly int _hashCode; + /** Header. */ + private readonly PortableObjectHeader _header; /** Fields. */ private volatile IDictionary _fields; @@ -53,28 +53,26 @@ namespace Apache.Ignite.Core.Impl.Portable private object _deserialized; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// Marshaller. /// Raw data of this portable object. /// Offset in data array. - /// Type ID. - /// Hash code. - public PortableUserObject(PortableMarshaller marsh, byte[] data, int offset, int typeId, int hashCode) + /// The header. + public PortableUserObject(PortableMarshaller marsh, byte[] data, int offset, PortableObjectHeader header) { _marsh = marsh; _data = data; _offset = offset; - _typeId = typeId; - _hashCode = hashCode; + _header = header; } /** */ public int TypeId { - get { return _typeId; } + get { return _header.TypeId; } } /** */ @@ -95,7 +93,7 @@ namespace Apache.Ignite.Core.Impl.Portable { IPortableStream stream = new PortableHeapStream(_data); - stream.Seek(pos, SeekOrigin.Begin); + stream.Seek(pos + _offset, SeekOrigin.Begin); return _marsh.Unmarshal(stream, PortableMode.ForcePortable, builder); } @@ -123,7 +121,7 @@ namespace Apache.Ignite.Core.Impl.Portable T res = _marsh.Unmarshal(stream, mode); - IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _typeId); + IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _header.TypeId); if (!desc.KeepDeserialized) return res; @@ -137,7 +135,7 @@ namespace Apache.Ignite.Core.Impl.Portable /** */ public IPortableMetadata GetMetadata() { - return _marsh.GetMetadata(_typeId); + return _marsh.GetMetadata(_header.TypeId); } /// @@ -158,11 +156,11 @@ namespace Apache.Ignite.Core.Impl.Portable public bool TryGetFieldPosition(string fieldName, out int pos) { - var desc = _marsh.GetDescriptor(true, _typeId); + var desc = _marsh.GetDescriptor(true, _header.TypeId); InitializeFields(); - int fieldId = PortableUtils.FieldId(_typeId, fieldName, desc.NameConverter, desc.Mapper); + int fieldId = PortableUtils.FieldId(_header.TypeId, fieldName, desc.NameConverter, desc.Mapper); return _fields.TryGetValue(fieldId, out pos); } @@ -172,22 +170,20 @@ namespace Apache.Ignite.Core.Impl.Portable /// private void InitializeFields() { - if (_fields == null) - { - IPortableStream stream = new PortableHeapStream(_data); + if (_fields != null) + return; - stream.Seek(_offset + PortableUtils.OffsetRaw, SeekOrigin.Begin); + var stream = new PortableHeapStream(_data); - int rawDataOffset = stream.ReadInt(); + var hdr = PortableObjectHeader.Read(stream, _offset); - _fields = PortableUtils.ObjectFields(stream, _typeId, rawDataOffset); - } + _fields = hdr.ReadSchemaAsDictionary(stream, _offset) ?? EmptyFields; } /** */ public override int GetHashCode() { - return _hashCode; + return _header.HashCode; } /** */ @@ -203,8 +199,8 @@ namespace Apache.Ignite.Core.Impl.Portable if (_data == that._data && _offset == that._offset) return true; - // 1. Check hash code and type IDs. - if (_hashCode == that._hashCode && _typeId == that._typeId) + // 1. Check headers + if (_header == that._header) { // 2. Check if objects have the same field sets. InitializeFields(); @@ -215,7 +211,7 @@ namespace Apache.Ignite.Core.Impl.Portable foreach (int id in _fields.Keys) { - if (!that._fields.Keys.Contains(id)) + if (!that._fields.ContainsKey(id)) return false; } @@ -230,18 +226,16 @@ namespace Apache.Ignite.Core.Impl.Portable } // 4. Check if objects have the same raw data. - IPortableStream stream = new PortableHeapStream(_data); - stream.Seek(_offset + PortableUtils.OffsetLen, SeekOrigin.Begin); - int len = stream.ReadInt(); - int rawOffset = stream.ReadInt(); - - IPortableStream thatStream = new PortableHeapStream(that._data); - thatStream.Seek(_offset + PortableUtils.OffsetLen, SeekOrigin.Begin); - int thatLen = thatStream.ReadInt(); - int thatRawOffset = thatStream.ReadInt(); - - return PortableUtils.CompareArrays(_data, _offset + rawOffset, len - rawOffset, that._data, - that._offset + thatRawOffset, thatLen - thatRawOffset); + // ReSharper disable ImpureMethodCallOnReadonlyValueField (method is not impure) + var stream = new PortableHeapStream(_data); + var rawOffset = _header.GetRawOffset(stream, _offset); + + var thatStream = new PortableHeapStream(that._data); + var thatRawOffset = that._header.GetRawOffset(thatStream, that._offset); + // ReSharper restore ImpureMethodCallOnReadonlyValueField + + return PortableUtils.CompareArrays(_data, _offset + rawOffset, _header.Length - rawOffset, + that._data, that._offset + thatRawOffset, that._header.Length - thatRawOffset); } } @@ -270,13 +264,13 @@ namespace Apache.Ignite.Core.Impl.Portable StringBuilder sb; - IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _typeId); + IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _header.TypeId); IPortableMetadata meta; try { - meta = _marsh.GetMetadata(_typeId); + meta = _marsh.GetMetadata(_header.TypeId); } catch (IgniteException) { @@ -284,7 +278,7 @@ namespace Apache.Ignite.Core.Impl.Portable } if (meta == null) - sb = new StringBuilder("PortableObject [typeId=").Append(_typeId).Append(", idHash=" + idHash); + sb = new StringBuilder("PortableObject [typeId=").Append(_header.TypeId).Append(", idHash=" + idHash); else { sb = new StringBuilder(meta.TypeName).Append(" [idHash=" + idHash); @@ -299,7 +293,7 @@ namespace Apache.Ignite.Core.Impl.Portable { sb.Append(", "); - int fieldId = PortableUtils.FieldId(_typeId, fieldName, desc.NameConverter, desc.Mapper); + int fieldId = PortableUtils.FieldId(_header.TypeId, fieldName, desc.NameConverter, desc.Mapper); int fieldPos;