Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FF44200D3A for ; Wed, 4 Oct 2017 10:44:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3EA0F160BDB; Wed, 4 Oct 2017 08:44:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EB67A160BD7 for ; Wed, 4 Oct 2017 10:44:47 +0200 (CEST) Received: (qmail 8919 invoked by uid 500); 4 Oct 2017 08:44:47 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 8515 invoked by uid 99); 4 Oct 2017 08:44:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 08:44:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 175A0F5C54; Wed, 4 Oct 2017 08:44:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 04 Oct 2017 08:44:51 -0000 Message-Id: <288a87cbae6b4be8bd9f2f02384f96cc@git.apache.org> In-Reply-To: <5ac53e9d040a42d29e5963f8e9e9644a@git.apache.org> References: <5ac53e9d040a42d29e5963f8e9e9644a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/25] ignite git commit: IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout archived-at: Wed, 04 Oct 2017 08:44:49 -0000 IGNITE-6517 .NET: DataStreamer DefaultPerNodeBufferSize, DefaultParallelOpsMultiplier, Timeout This closes #2785 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5764960e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5764960e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5764960e Branch: refs/heads/ignite-5937 Commit: 5764960e802e91b87956f4515e289eaf0003a2de Parents: 5ca7909 Author: Pavel Tupitsyn Authored: Mon Oct 2 16:48:23 2017 +0300 Committer: Pavel Tupitsyn Committed: Mon Oct 2 16:48:23 2017 +0300 ---------------------------------------------------------------------- .../datastreamer/PlatformDataStreamer.java | 14 ++++++ .../Dataload/DataStreamerTest.cs | 50 +++++++++++++++++--- .../Apache.Ignite.Core.csproj | 1 + .../Datastream/DataStreamerDefaults.cs | 46 ++++++++++++++++++ .../Datastream/IDataStreamer.cs | 21 +++++++- .../Impl/Binary/BinaryReaderExtensions.cs | 10 +--- .../Impl/Binary/BinaryUtils.cs | 14 ++++++ .../Impl/Datastream/DataStreamerImpl.cs | 43 ++++++++++++++++- 8 files changed, 179 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index fba0a4c..8cd14c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -86,6 +86,12 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { /** */ private static final int OP_LISTEN_TOPOLOGY = 11; + /** */ + private static final int OP_GET_TIMEOUT = 12; + + /** */ + private static final int OP_SET_TIMEOUT = 13; + /** Cache name. */ private final String cacheName; @@ -230,6 +236,14 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { case OP_PER_NODE_PARALLEL_OPS: return ldr.perNodeParallelOperations(); + + case OP_GET_TIMEOUT: + return ldr.timeout(); + + case OP_SET_TIMEOUT: + ldr.timeout(val); + + return TRUE; } return super.processInLongOutLong(type, val); http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs index fe5955f..60a1067 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Dataload/DataStreamerTest.cs @@ -95,25 +95,40 @@ namespace Apache.Ignite.Core.Tests.Dataload { using (IDataStreamer ldr = _grid.GetDataStreamer(CacheName)) { + Assert.AreEqual(CacheName, ldr.CacheName); + Assert.AreEqual(0, ldr.AutoFlushFrequency); + + Assert.IsFalse(ldr.AllowOverwrite); ldr.AllowOverwrite = true; Assert.IsTrue(ldr.AllowOverwrite); ldr.AllowOverwrite = false; Assert.IsFalse(ldr.AllowOverwrite); + Assert.IsFalse(ldr.SkipStore); ldr.SkipStore = true; Assert.IsTrue(ldr.SkipStore); ldr.SkipStore = false; Assert.IsFalse(ldr.SkipStore); + Assert.AreEqual(DataStreamerDefaults.DefaultPerNodeBufferSize, ldr.PerNodeBufferSize); ldr.PerNodeBufferSize = 1; Assert.AreEqual(1, ldr.PerNodeBufferSize); ldr.PerNodeBufferSize = 2; Assert.AreEqual(2, ldr.PerNodeBufferSize); - ldr.PerNodeParallelOperations = 1; - Assert.AreEqual(1, ldr.PerNodeParallelOperations); + Assert.AreEqual(0, ldr.PerNodeParallelOperations); + var ops = DataStreamerDefaults.DefaultParallelOperationsMultiplier * + IgniteConfiguration.DefaultThreadPoolSize; + ldr.PerNodeParallelOperations = ops; + Assert.AreEqual(ops, ldr.PerNodeParallelOperations); ldr.PerNodeParallelOperations = 2; Assert.AreEqual(2, ldr.PerNodeParallelOperations); + + Assert.AreEqual(DataStreamerDefaults.DefaultTimeout, ldr.Timeout); + ldr.Timeout = TimeSpan.MaxValue; + Assert.AreEqual(TimeSpan.MaxValue, ldr.Timeout); + ldr.Timeout = TimeSpan.FromSeconds(1.5); + Assert.AreEqual(1.5, ldr.Timeout.TotalSeconds); } } @@ -123,28 +138,37 @@ namespace Apache.Ignite.Core.Tests.Dataload [Test] public void TestAddRemove() { - using (IDataStreamer ldr = _grid.GetDataStreamer(CacheName)) + IDataStreamer ldr; + + using (ldr = _grid.GetDataStreamer(CacheName)) { + Assert.IsFalse(ldr.Task.IsCompleted); + ldr.AllowOverwrite = true; // Additions. - ldr.AddData(1, 1); + var task = ldr.AddData(1, 1); ldr.Flush(); Assert.AreEqual(1, _cache.Get(1)); + Assert.IsTrue(task.IsCompleted); + Assert.IsFalse(ldr.Task.IsCompleted); - ldr.AddData(new KeyValuePair(2, 2)); + task = ldr.AddData(new KeyValuePair(2, 2)); ldr.Flush(); Assert.AreEqual(2, _cache.Get(2)); + Assert.IsTrue(task.IsCompleted); - ldr.AddData(new List> { new KeyValuePair(3, 3), new KeyValuePair(4, 4) }); + task = ldr.AddData(new [] { new KeyValuePair(3, 3), new KeyValuePair(4, 4) }); ldr.Flush(); Assert.AreEqual(3, _cache.Get(3)); Assert.AreEqual(4, _cache.Get(4)); + Assert.IsTrue(task.IsCompleted); // Removal. - ldr.RemoveData(1); + task = ldr.RemoveData(1); ldr.Flush(); Assert.IsFalse(_cache.ContainsKey(1)); + Assert.IsTrue(task.IsCompleted); // Mixed. ldr.AddData(5, 5); @@ -165,6 +189,8 @@ namespace Apache.Ignite.Core.Tests.Dataload for (int i = 5; i < 13; i++) Assert.AreEqual(i, _cache.Get(i)); } + + Assert.IsTrue(ldr.Task.IsCompleted); } /// @@ -517,6 +543,16 @@ namespace Apache.Ignite.Core.Tests.Dataload for (var i = 0; i < 100; i++) Assert.AreEqual(i + 1, cache.Get(i).Val); + + // Repeating WithKeepBinary call: valid args. + Assert.AreSame(ldr, ldr.WithKeepBinary()); + + // Invalid type args. + var ex = Assert.Throws(() => ldr.WithKeepBinary()); + + Assert.AreEqual( + "Can't change type of binary streamer. WithKeepBinary has been called on an instance of " + + "binary streamer with incompatible generic arguments.", ex.Message); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 67c540c..58abd26 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -104,6 +104,7 @@ + http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs new file mode 100644 index 0000000..315ae7f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/DataStreamerDefaults.cs @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Datastream +{ + using System; + + /// + /// Data streamer configuration defaults. + /// + public static class DataStreamerDefaults + { + /// + /// The default per node buffer size, see . + /// + public const int DefaultPerNodeBufferSize = 512; + + /// + /// Default multiplier for parallel operations per node: + /// = + /// * + /// . + /// + public const int DefaultParallelOperationsMultiplier = 8; + + /// + /// The default timeout (see ). + /// Negative value means no timeout. + /// + public static readonly TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(-1); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs index 222f6c3..277130c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Datastream/IDataStreamer.cs @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Datastream { using System; using System.Collections.Generic; + using System.ComponentModel; using System.Threading.Tasks; using Apache.Ignite.Core.Cache.Store; @@ -110,8 +111,9 @@ namespace Apache.Ignite.Core.Datastream /// /// Setter must be called before any add/remove operation. /// - /// Default is 1024. + /// Default is . /// + [DefaultValue(DataStreamerDefaults.DefaultPerNodeBufferSize)] int PerNodeBufferSize { get; set; } /// @@ -119,7 +121,9 @@ namespace Apache.Ignite.Core.Datastream /// /// Setter must be called before any add/remove operation. /// - /// Default is 16. + /// Default is 0, which means Ignite calculates this automatically as + /// * + /// . /// int PerNodeParallelOperations { get; set; } @@ -208,5 +212,18 @@ namespace Apache.Ignite.Core.Datastream /// Value type in binary mode. /// Streamer instance with binary mode enabled. IDataStreamer WithKeepBinary(); + + /// + /// Gets or sets the timeout. Negative values mean no timeout. + /// Default is . + /// + /// Timeout is used in the following cases: + ///
  • Any data addition method can be blocked when all per node parallel operations are exhausted. + /// The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data + /// into the streamer;
  • + ///
  • Total timeout time for operation;
  • + ///
  • Total timeout time for operation.
  • + ///
    + TimeSpan Timeout { get; set; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs index 7556c41..da87d21 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs @@ -58,15 +58,7 @@ namespace Apache.Ignite.Core.Impl.Binary /// TimeSpan. public static TimeSpan ReadLongAsTimespan(this IBinaryRawReader reader) { - long ms = reader.ReadLong(); - - if (ms >= TimeSpan.MaxValue.TotalMilliseconds) - return TimeSpan.MaxValue; - - if (ms <= TimeSpan.MinValue.TotalMilliseconds) - return TimeSpan.MinValue; - - return TimeSpan.FromMilliseconds(ms); + return BinaryUtils.LongToTimeSpan(reader.ReadLong()); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs index 46e6752..139783d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryUtils.cs @@ -1664,6 +1664,20 @@ namespace Apache.Ignite.Core.Impl.Binary } /// + /// Converts long to timespan. + /// + public static TimeSpan LongToTimeSpan(long ms) + { + if (ms >= TimeSpan.MaxValue.TotalMilliseconds) + return TimeSpan.MaxValue; + + if (ms <= TimeSpan.MinValue.TotalMilliseconds) + return TimeSpan.MinValue; + + return TimeSpan.FromMilliseconds(ms); + } + + /// /// Creates and instance from the type name in reader. /// private static T CreateInstance(BinaryReader reader) http://git-wip-us.apache.org/repos/asf/ignite/blob/5764960e/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 96b24ab..555c6e6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -93,6 +93,12 @@ namespace Apache.Ignite.Core.Impl.Datastream /** */ private const int OpListenTopology = 11; + /** */ + private const int OpGetTimeout = 12; + + /** */ + private const int OpSetTimeout = 13; + /** Cache name. */ private readonly string _cacheName; @@ -356,8 +362,6 @@ namespace Apache.Ignite.Core.Impl.Datastream { get { - ThrowIfDisposed(); - return _closeFut.Task; } } @@ -549,6 +553,41 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ + public TimeSpan Timeout + { + get + { + _rwLock.EnterReadLock(); + + try + { + ThrowIfDisposed(); + + return BinaryUtils.LongToTimeSpan(DoOutInOp(OpGetTimeout)); + } + finally + { + _rwLock.ExitReadLock(); + } + } + set + { + _rwLock.EnterWriteLock(); + + try + { + ThrowIfDisposed(); + + DoOutInOp(OpSetTimeout, (long) value.TotalMilliseconds); + } + finally + { + _rwLock.ExitWriteLock(); + } + } + } + + /** */ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] protected override void Dispose(bool disposing) {