Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E801218572 for ; Tue, 27 Oct 2015 14:59:43 +0000 (UTC) Received: (qmail 79614 invoked by uid 500); 27 Oct 2015 14:59:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 79574 invoked by uid 500); 27 Oct 2015 14:59:31 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 79391 invoked by uid 99); 27 Oct 2015 14:59:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 14:59:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3675DFCE0; Tue, 27 Oct 2015 14:59:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Tue, 27 Oct 2015 14:59:34 -0000 Message-Id: <3c1282f0916545c784123669dddf9e4a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/9] ignite git commit: IGNITE-1652: .Net async API reworked. http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index af230b3..4ceb292 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -22,12 +22,11 @@ namespace Apache.Ignite.Core.Impl.Cache using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; - using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Expiry; using Apache.Ignite.Core.Cache.Query; using Apache.Ignite.Core.Cache.Query.Continuous; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Cache.Query; using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Common; @@ -67,13 +66,8 @@ namespace Apache.Ignite.Core.Impl.Cache /** Flag: no-retries.*/ private readonly bool _flagNoRetries; - /** - * Result converter for async InvokeAll operation. - * In future result processing there is only one TResult generic argument, - * and we can't get the type of ICacheEntryProcessorResult at compile time from it. - * This field caches converter for the last InvokeAll operation to avoid using reflection. - */ - private readonly ThreadLocal _invokeAllConverter = new ThreadLocal(); + /** Async instance. */ + private readonly Lazy> _asyncInstance; /// /// Constructor. @@ -93,54 +87,62 @@ namespace Apache.Ignite.Core.Impl.Cache _flagKeepPortable = flagKeepPortable; _flagAsync = flagAsync; _flagNoRetries = flagNoRetries; + + _asyncInstance = new Lazy>(() => new CacheImpl(this)); } - /** */ - public IIgnite Ignite + /// + /// Initializes a new async instance. + /// + /// The cache. + private CacheImpl(CacheImpl cache) : base(UU.CacheWithAsync(cache.Target), cache.Marshaller) { - get - { - return _ignite; - } + _ignite = cache._ignite; + _flagSkipStore = cache._flagSkipStore; + _flagKeepPortable = cache._flagKeepPortable; + _flagAsync = true; + _flagNoRetries = cache._flagNoRetries; } /** */ - public bool IsAsync + public IIgnite Ignite { - get { return _flagAsync; } + get { return _ignite; } } /** */ - public IFuture GetFuture() + private bool IsAsync { - throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + get { return _flagAsync; } } - /** */ - public IFuture GetFuture() + /// + /// Gets and resets task for previous asynchronous operation. + /// + /// The last async op id. + /// + /// Task for previous asynchronous operation. + /// + private Task GetTask(CacheOp lastAsyncOp) { - throw new NotSupportedException("GetFuture() should be called through CacheProxyImpl"); + return GetTask(lastAsyncOp); } /// - /// Gets and resets future for previous asynchronous operation. + /// Gets and resets task for previous asynchronous operation. /// + /// The type of the result. /// The last async op id. + /// The converter. /// - /// Future for previous asynchronous operation. + /// Task for previous asynchronous operation. /// - /// Asynchronous mode is disabled - internal IFuture GetFuture(CacheOp lastAsyncOp) + private Task GetTask(CacheOp lastAsyncOp, Func converter = null) { - if (!_flagAsync) - throw IgniteUtils.GetAsyncModeDisabledException(); - - var converter = GetFutureResultConverter(lastAsyncOp); - - _invokeAllConverter.Value = null; + Debug.Assert(_flagAsync); return GetFuture((futId, futTypeId) => UU.TargetListenFutureForOperation(Target, futId, futTypeId, - (int) lastAsyncOp), _flagKeepPortable, converter); + (int) lastAsyncOp), _flagKeepPortable, converter).Task; } /** */ @@ -151,6 +153,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** */ + /** */ public bool IsEmpty() { return GetSize() == 0; @@ -225,13 +228,6 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ - public ICache WithAsync() - { - return _flagAsync ? this : new CacheImpl(_ignite, UU.CacheWithAsync(Target), Marshaller, - _flagSkipStore, _flagKeepPortable, true, _flagNoRetries); - } - - /** */ public bool IsKeepPortable { get { return _flagKeepPortable; } @@ -244,11 +240,27 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task LoadCacheAsync(ICacheEntryFilter p, params object[] args) + { + AsyncInstance.LoadCache(p, args); + + return AsyncInstance.GetTask(CacheOp.LoadCache); + } + + /** */ public void LocalLoadCache(ICacheEntryFilter p, params object[] args) { LoadCache0(p, args, (int)CacheOp.LocLoadCache); } + /** */ + public Task LocalLoadCacheAsync(ICacheEntryFilter p, params object[] args) + { + AsyncInstance.LocalLoadCache(p, args); + + return AsyncInstance.GetTask(CacheOp.LocLoadCache); + } + /// /// Loads the cache. /// @@ -276,7 +288,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); return DoOutOp((int)CacheOp.ContainsKey, key) == True; - } + } + + /** */ + public Task ContainsKeyAsync(TK key) + { + AsyncInstance.ContainsKey(key); + + return AsyncInstance.GetTask(CacheOp.ContainsKey); + } /** */ public bool ContainsKeys(IEnumerable keys) @@ -284,7 +304,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(keys, "keys"); return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True; - } + } + + /** */ + public Task ContainsKeysAsync(IEnumerable keys) + { + AsyncInstance.ContainsKeys(keys); + + return AsyncInstance.GetTask(CacheOp.ContainsKeys); + } /** */ public TV LocalPeek(TK key, params CachePeekMode[] modes) @@ -355,6 +383,20 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task GetAsync(TK key) + { + AsyncInstance.Get(key); + + return AsyncInstance.GetTask(CacheOp.Get, reader => + { + if (reader != null) + return reader.ReadObject(); + + throw GetKeyNotFoundException(); + }); + } + + /** */ public bool TryGet(TK key, out TV value) { IgniteArgumentCheck.NotNull(key, "key"); @@ -370,6 +412,16 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task> TryGetAsync(TK key) + { + IgniteArgumentCheck.NotNull(key, "key"); + + AsyncInstance.Get(key); + + return AsyncInstance.GetTask(CacheOp.Get, GetCacheResult); + } + + /** */ public IDictionary GetAll(IEnumerable keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -384,6 +436,14 @@ namespace Apache.Ignite.Core.Impl.Cache }); } + /** */ + public Task> GetAllAsync(IEnumerable keys) + { + AsyncInstance.GetAll(keys); + + return AsyncInstance.GetTask(CacheOp.GetAll, r => r == null ? null : ReadGetAllDictionary(r)); + } + /** */ public void Put(TK key, TV val) { @@ -395,6 +455,14 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task PutAsync(TK key, TV val) + { + AsyncInstance.Put(key, val); + + return AsyncInstance.GetTask(CacheOp.Put); + } + + /** */ public CacheResult GetAndPut(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); @@ -405,13 +473,29 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task> GetAndPutAsync(TK key, TV val) + { + AsyncInstance.GetAndPut(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndPut, GetCacheResult); + } + + /** */ public CacheResult GetAndReplace(TK key, TV val) { IgniteArgumentCheck.NotNull(key, "key"); IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable((int)CacheOp.GetAndReplace, key, val); + return DoOutInOpNullable((int) CacheOp.GetAndReplace, key, val); + } + + /** */ + public Task> GetAndReplaceAsync(TK key, TV val) + { + AsyncInstance.GetAndReplace(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndReplace, GetCacheResult); } /** */ @@ -422,6 +506,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpNullable((int)CacheOp.GetAndRemove, key); } + /** */ + public Task> GetAndRemoveAsync(TK key) + { + AsyncInstance.GetAndRemove(key); + + return AsyncInstance.GetTask(CacheOp.GetAndRemove, GetCacheResult); + } + /** */ public bool PutIfAbsent(TK key, TV val) { @@ -432,6 +524,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True; } + /** */ + public Task PutIfAbsentAsync(TK key, TV val) + { + AsyncInstance.PutIfAbsent(key, val); + + return AsyncInstance.GetTask(CacheOp.PutIfAbsent); + } + /** */ public CacheResult GetAndPutIfAbsent(TK key, TV val) { @@ -442,6 +542,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpNullable((int)CacheOp.GetAndPutIfAbsent, key, val); } + /** */ + public Task> GetAndPutIfAbsentAsync(TK key, TV val) + { + AsyncInstance.GetAndPutIfAbsent(key, val); + + return AsyncInstance.GetTask(CacheOp.GetAndPutIfAbsent, GetCacheResult); + } + /** */ public bool Replace(TK key, TV val) { @@ -449,7 +557,15 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int)CacheOp.Replace2, key, val) == True; + return DoOutOp((int) CacheOp.Replace2, key, val) == True; + } + + /** */ + public Task ReplaceAsync(TK key, TV val) + { + AsyncInstance.Replace(key, val); + + return AsyncInstance.GetTask(CacheOp.Replace2); } /** */ @@ -464,6 +580,14 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True; } + /** */ + public Task ReplaceAsync(TK key, TV oldVal, TV newVal) + { + AsyncInstance.Replace(key, oldVal, newVal); + + return AsyncInstance.GetTask(CacheOp.Replace3); + } + /** */ public void PutAll(IDictionary vals) { @@ -471,7 +595,15 @@ namespace Apache.Ignite.Core.Impl.Cache DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals)); } - + + /** */ + public Task PutAllAsync(IDictionary vals) + { + AsyncInstance.PutAll(vals); + + return AsyncInstance.GetTask(CacheOp.PutAll); + } + /** */ public void LocalEvict(IEnumerable keys) { @@ -486,12 +618,28 @@ namespace Apache.Ignite.Core.Impl.Cache UU.CacheClear(Target); } + /** */ + public Task ClearAsync() + { + AsyncInstance.Clear(); + + return AsyncInstance.GetTask(); + } + /** */ public void Clear(TK key) { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int)CacheOp.Clear, key); + DoOutOp((int) CacheOp.Clear, key); + } + + /** */ + public Task ClearAsync(TK key) + { + AsyncInstance.Clear(key); + + return AsyncInstance.GetTask(CacheOp.Clear); } /** */ @@ -502,6 +650,14 @@ namespace Apache.Ignite.Core.Impl.Cache DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); } + /** */ + public Task ClearAllAsync(IEnumerable keys) + { + AsyncInstance.ClearAll(keys); + + return AsyncInstance.GetTask(CacheOp.ClearAll); + } + /** */ public void LocalClear(TK key) { @@ -523,7 +679,15 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.RemoveObj, key) == True; + return DoOutOp((int) CacheOp.RemoveObj, key) == True; + } + + /** */ + public Task RemoveAsync(TK key) + { + AsyncInstance.Remove(key); + + return AsyncInstance.GetTask(CacheOp.RemoveObj); } /** */ @@ -537,6 +701,14 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task RemoveAsync(TK key, TV val) + { + AsyncInstance.Remove(key, val); + + return AsyncInstance.GetTask(CacheOp.RemoveBool); + } + + /** */ public void RemoveAll(IEnumerable keys) { IgniteArgumentCheck.NotNull(keys, "keys"); @@ -545,12 +717,28 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ + public Task RemoveAllAsync(IEnumerable keys) + { + AsyncInstance.RemoveAll(keys); + + return AsyncInstance.GetTask(CacheOp.RemoveAll); + } + + /** */ public void RemoveAll() { UU.CacheRemoveAll(Target); } /** */ + public Task RemoveAllAsync() + { + AsyncInstance.RemoveAll(); + + return AsyncInstance.GetTask(); + } + + /** */ public int GetLocalSize(params CachePeekMode[] modes) { return Size0(true, modes); @@ -562,6 +750,14 @@ namespace Apache.Ignite.Core.Impl.Cache return Size0(false, modes); } + /** */ + public Task GetSizeAsync(params CachePeekMode[] modes) + { + AsyncInstance.GetSize(modes); + + return AsyncInstance.GetTask(); + } + /// /// Internal size routine. /// @@ -601,6 +797,25 @@ namespace Apache.Ignite.Core.Impl.Cache input => GetResultOrThrow(Unmarshal(input))); } + /** */ + public Task InvokeAsync(TK key, ICacheEntryProcessor processor, TArg arg) + { + AsyncInstance.Invoke(key, processor, arg); + + return AsyncInstance.GetTask(CacheOp.Invoke, r => + { + if (r == null) + return default(TRes); + + var hasError = r.ReadBoolean(); + + if (hasError) + throw ReadException(r.Stream); + + return r.ReadObject(); + }); + } + /** */ public IDictionary> InvokeAll(IEnumerable keys, ICacheEntryProcessor processor, TArg arg) @@ -612,19 +827,21 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int)CacheOp.InvokeAll, writer => - { - WriteEnumerable(writer, keys); - writer.Write(holder); - }, - input => - { - if (IsAsync) - _invokeAllConverter.Value = (Func>>) - (reader => ReadInvokeAllResults(reader.Stream)); + return DoOutInOp((int) CacheOp.InvokeAll, + writer => + { + WriteEnumerable(writer, keys); + writer.Write(holder); + }, + input => ReadInvokeAllResults(input)); + } - return ReadInvokeAllResults(input); - }); + /** */ + public Task>> InvokeAllAsync(IEnumerable keys, ICacheEntryProcessor processor, TArg arg) + { + AsyncInstance.InvokeAll(keys, processor, arg); + + return AsyncInstance.GetTask(CacheOp.InvokeAll, reader => ReadInvokeAllResults(reader.Stream)); } /** */ @@ -673,9 +890,9 @@ namespace Apache.Ignite.Core.Impl.Cache } /** */ - public IFuture Rebalance() + public Task Rebalance() { - return GetFuture((futId, futTyp) => UU.CacheRebalance(Target, futId)); + return GetFuture((futId, futTyp) => UU.CacheRebalance(Target, futId)).Task; } /** */ @@ -689,11 +906,11 @@ namespace Apache.Ignite.Core.Impl.Cache } /// - /// Gets a value indicating whether this instance is in no-retries mode. + /// Gets the asynchronous instance. /// - internal bool IsNoRetries + private CacheImpl AsyncInstance { - get { return _flagNoRetries; } + get { return _asyncInstance.Value; } } #region Queries @@ -970,59 +1187,15 @@ namespace Apache.Ignite.Core.Impl.Cache } /// - /// Gets the future result converter based on the last operation id. + /// Gets the cache result. /// - /// The type of the future result. - /// The last op id. - /// Future result converter. - private Func GetFutureResultConverter(CacheOp lastAsyncOpId) + private static CacheResult GetCacheResult(PortableReaderImpl reader) { - switch (lastAsyncOpId) - { - case CacheOp.Get: - return reader => - { - if (reader != null) - return reader.ReadObject(); - - throw GetKeyNotFoundException(); - }; - - case CacheOp.GetAll: - return reader => reader == null ? default(TResult) : (TResult) ReadGetAllDictionary(reader); + var res = reader == null + ? new CacheResult() + : new CacheResult(reader.ReadObject()); - case CacheOp.Invoke: - return reader => - { - if (reader == null) - return default(TResult); - - var hasError = reader.ReadBoolean(); - - if (hasError) - throw ReadException(reader.Stream); - - return reader.ReadObject(); - }; - - case CacheOp.InvokeAll: - return _invokeAllConverter.Value as Func; - - case CacheOp.GetAndPut: - case CacheOp.GetAndPutIfAbsent: - case CacheOp.GetAndRemove: - case CacheOp.GetAndReplace: - return reader => - { - var res = reader == null - ? new CacheResult() - : new CacheResult(reader.ReadObject()); - - return TypeCaster.Cast(res); - }; - } - - return null; + return res; } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs deleted file mode 100644 index aaaf8c3..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheProxyImpl.cs +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Cache -{ - using System.Collections; - using System.Collections.Generic; - using System.Diagnostics; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cache.Expiry; - using Apache.Ignite.Core.Cache.Query; - using Apache.Ignite.Core.Cache.Query.Continuous; - using Apache.Ignite.Core.Common; - - /// - /// Cache proxy. - /// - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class CacheProxyImpl : ICache - { - /** wrapped cache instance */ - private readonly CacheImpl _cache; - - /** */ - private readonly ThreadLocal _lastAsyncOp = new ThreadLocal(() => CacheOp.None); - - /// - /// Initializes a new instance of the class. - /// - /// The cache to wrap. - public CacheProxyImpl(CacheImpl cache) - { - Debug.Assert(cache != null); - - _cache = cache; - } - - /** */ - public ICache WithSkipStore() - { - return _cache.IsSkipStore ? this : new CacheProxyImpl((CacheImpl)_cache.WithSkipStore()); - } - - /** */ - public ICache WithExpiryPolicy(IExpiryPolicy plc) - { - return new CacheProxyImpl((CacheImpl)_cache.WithExpiryPolicy(plc)); - } - - /** */ - public ICache WithAsync() - { - return IsAsync ? this : new CacheProxyImpl((CacheImpl) _cache.WithAsync()); - } - - /** */ - public bool IsAsync - { - get { return _cache.IsAsync; } - } - - /** */ - public IFuture GetFuture() - { - return GetFuture(); - } - - /** */ - public IFuture GetFuture() - { - var fut = _cache.GetFuture(_lastAsyncOp.Value); - - ClearLastAsyncOp(); - - return fut; - } - - /** */ - public IEnumerator> GetEnumerator() - { - return _cache.GetEnumerator(); - } - - /** */ - IEnumerator IEnumerable.GetEnumerator() - { - return ((IEnumerable) _cache).GetEnumerator(); - } - - /** */ - public string Name - { - get { return _cache.Name; } - } - - /** */ - public IIgnite Ignite - { - get { return _cache.Ignite; } - } - - /** */ - - public bool IsEmpty() - { - return _cache.IsEmpty(); - } - - /** */ - public bool IsKeepPortable - { - get { return _cache.IsKeepPortable; } - } - - /// - /// Skip store flag. - /// - internal bool SkipStore - { - get { return _cache.IsSkipStore; } - } - - /** */ - public ICache WithKeepPortable() - { - return new CacheProxyImpl((CacheImpl) _cache.WithKeepPortable()); - } - - /** */ - public void LoadCache(ICacheEntryFilter p, params object[] args) - { - _cache.LoadCache(p, args); - - SetLastAsyncOp(CacheOp.LoadCache); - } - - /** */ - public void LocalLoadCache(ICacheEntryFilter p, params object[] args) - { - _cache.LocalLoadCache(p, args); - - SetLastAsyncOp(CacheOp.LocLoadCache); - } - - /** */ - public bool ContainsKey(TK key) - { - var result = _cache.ContainsKey(key); - - SetLastAsyncOp(CacheOp.ContainsKey); - - return result; - } - - /** */ - public bool ContainsKeys(IEnumerable keys) - { - var result = _cache.ContainsKeys(keys); - - SetLastAsyncOp(CacheOp.ContainsKeys); - - return result; - } - - /** */ - public TV LocalPeek(TK key, params CachePeekMode[] modes) - { - return _cache.LocalPeek(key, modes); - } - - /** */ - public bool TryLocalPeek(TK key, out TV value, params CachePeekMode[] modes) - { - return _cache.TryLocalPeek(key, out value, modes); - } - - /** */ - public TV this[TK key] - { - get { return _cache[key]; } - set { _cache[key] = value; } - } - - /** */ - public TV Get(TK key) - { - var result = _cache.Get(key); - - SetLastAsyncOp(CacheOp.Get); - - return result; - } - - /** */ - public bool TryGet(TK key, out TV value) - { - return _cache.TryGet(key, out value); - } - - /** */ - public IDictionary GetAll(IEnumerable keys) - { - var result = _cache.GetAll(keys); - - SetLastAsyncOp(CacheOp.GetAll); - - return result; - } - - /** */ - public void Put(TK key, TV val) - { - _cache.Put(key, val); - - SetLastAsyncOp(CacheOp.Put); - } - - /** */ - public CacheResult GetAndPut(TK key, TV val) - { - var result = _cache.GetAndPut(key, val); - - SetLastAsyncOp(CacheOp.GetAndPut); - - return result; - } - - /** */ - public CacheResult GetAndReplace(TK key, TV val) - { - var result = _cache.GetAndReplace(key, val); - - SetLastAsyncOp(CacheOp.GetAndReplace); - - return result; - } - - /** */ - public CacheResult GetAndRemove(TK key) - { - var result = _cache.GetAndRemove(key); - - SetLastAsyncOp(CacheOp.GetAndRemove); - - return result; - } - - /** */ - public bool PutIfAbsent(TK key, TV val) - { - var result = _cache.PutIfAbsent(key, val); - - SetLastAsyncOp(CacheOp.PutIfAbsent); - - return result; - } - - /** */ - public CacheResult GetAndPutIfAbsent(TK key, TV val) - { - var result = _cache.GetAndPutIfAbsent(key, val); - - SetLastAsyncOp(CacheOp.GetAndPutIfAbsent); - - return result; - } - - /** */ - public bool Replace(TK key, TV val) - { - var result = _cache.Replace(key, val); - - SetLastAsyncOp(CacheOp.Replace2); - - return result; - } - - /** */ - public bool Replace(TK key, TV oldVal, TV newVal) - { - var result = _cache.Replace(key, oldVal, newVal); - - SetLastAsyncOp(CacheOp.Replace3); - - return result; - } - - /** */ - public void PutAll(IDictionary vals) - { - _cache.PutAll(vals); - - SetLastAsyncOp(CacheOp.PutAll); - } - - /** */ - public void LocalEvict(IEnumerable keys) - { - _cache.LocalEvict(keys); - } - - /** */ - public void Clear() - { - _cache.Clear(); - - ClearLastAsyncOp(); - } - - /** */ - public void Clear(TK key) - { - _cache.Clear(key); - - SetLastAsyncOp(CacheOp.Clear); - } - - /** */ - public void ClearAll(IEnumerable keys) - { - _cache.ClearAll(keys); - - SetLastAsyncOp(CacheOp.ClearAll); - } - - /** */ - public void LocalClear(TK key) - { - _cache.LocalClear(key); - } - - /** */ - public void LocalClearAll(IEnumerable keys) - { - _cache.LocalClearAll(keys); - } - - /** */ - public bool Remove(TK key) - { - var result = _cache.Remove(key); - - SetLastAsyncOp(CacheOp.RemoveObj); - - return result; - } - - /** */ - public bool Remove(TK key, TV val) - { - var result = _cache.Remove(key, val); - - SetLastAsyncOp(CacheOp.RemoveBool); - - return result; - } - - /** */ - public void RemoveAll(IEnumerable keys) - { - _cache.RemoveAll(keys); - - SetLastAsyncOp(CacheOp.RemoveAll); - } - - /** */ - public void RemoveAll() - { - _cache.RemoveAll(); - - ClearLastAsyncOp(); - } - - /** */ - public int GetLocalSize(params CachePeekMode[] modes) - { - return _cache.GetLocalSize(modes); - } - - /** */ - public int GetSize(params CachePeekMode[] modes) - { - var result = _cache.GetSize(modes); - - ClearLastAsyncOp(); - - return result; - } - - /** */ - public void LocalPromote(IEnumerable keys) - { - _cache.LocalPromote(keys); - } - - /** */ - public IQueryCursor> Query(QueryBase qry) - { - return _cache.Query(qry); - } - - /** */ - public IQueryCursor QueryFields(SqlFieldsQuery qry) - { - return _cache.QueryFields(qry); - } - - /** */ - public IContinuousQueryHandle QueryContinuous(ContinuousQuery qry) - { - return _cache.QueryContinuous(qry); - } - - /** */ - public IContinuousQueryHandle> QueryContinuous(ContinuousQuery qry, QueryBase initialQry) - { - return _cache.QueryContinuous(qry, initialQry); - } - - /** */ - public IEnumerable> GetLocalEntries(params CachePeekMode[] peekModes) - { - return _cache.GetLocalEntries(peekModes); - } - - /** */ - public TRes Invoke(TK key, ICacheEntryProcessor processor, TArg arg) - { - var result = _cache.Invoke(key, processor, arg); - - SetLastAsyncOp(CacheOp.Invoke); - - return result; - } - - /** */ - public IDictionary> InvokeAll(IEnumerable keys, - ICacheEntryProcessor processor, TArg arg) - { - var result = _cache.InvokeAll(keys, processor, arg); - - SetLastAsyncOp(CacheOp.InvokeAll); - - return result; - } - - /** */ - public ICacheLock Lock(TK key) - { - return _cache.Lock(key); - } - - /** */ - public ICacheLock LockAll(IEnumerable keys) - { - return _cache.LockAll(keys); - } - - /** */ - public bool IsLocalLocked(TK key, bool byCurrentThread) - { - return _cache.IsLocalLocked(key, byCurrentThread); - } - - /** */ - public ICacheMetrics GetMetrics() - { - return _cache.GetMetrics(); - } - - /** */ - public IFuture Rebalance() - { - return _cache.Rebalance(); - } - - /** */ - public ICache WithNoRetries() - { - return _cache.IsNoRetries ? this : new CacheProxyImpl((CacheImpl) _cache.WithNoRetries()); - } - - /// - /// Sets the last asynchronous op id. - /// - /// The op identifier. - private void SetLastAsyncOp(CacheOp opId) - { - if (IsAsync) - _lastAsyncOp.Value = opId; - } - - /// - /// Clears the last asynchronous op id. - /// This should be called in the end of each method that supports async and does not call SetLastAsyncOp. - /// - private void ClearLastAsyncOp() - { - if (IsAsync) - _lastAsyncOp.Value = CacheOp.None; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs deleted file mode 100644 index 4e5c396..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Common -{ - using System; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - using Apache.Ignite.Core.Common; - - /// - /// Adapts IGridFuture to the IAsyncResult. - /// - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", - Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " + - "to the client, and IAsyncResult is not IDisposable.")] - public class AsyncResult : IAsyncResult - { - /** */ - private readonly ManualResetEvent _waitHandle; - - /// - /// Initializes a new instance of the class. - /// - /// The future to wrap. - public AsyncResult(IFuture fut) - { - _waitHandle = new ManualResetEvent(false); - - fut.Listen(() => _waitHandle.Set()); - } - - /** */ - public bool IsCompleted - { - get { return _waitHandle.WaitOne(0); } - } - - /** */ - public WaitHandle AsyncWaitHandle - { - get { return _waitHandle; } - } - - /** */ - public object AsyncState - { - get { return null; } - } - - /** */ - public bool CompletedSynchronously - { - get { return false; } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs deleted file mode 100644 index febe969..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Common -{ - using System; - using System.Diagnostics.CodeAnalysis; - using System.Threading; - - /// - /// Represents an IAsyncResult that is completed. - /// - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable", - Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " + - "to the client, and IAsyncResult is not IDisposable.")] - public class CompletedAsyncResult : IAsyncResult - { - /** */ - private readonly WaitHandle _asyncWaitHandle = new ManualResetEvent(true); - - /** */ - public bool IsCompleted - { - get { return true; } - } - - /** */ - public WaitHandle AsyncWaitHandle - { - get { return _asyncWaitHandle; } - } - - /** */ - public object AsyncState - { - get { return null; } - } - - /** */ - public bool CompletedSynchronously - { - get { return false; } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index 70bebc4..4bf8a32 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -18,11 +18,8 @@ namespace Apache.Ignite.Core.Impl.Common { using System; - using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; - using System.Threading; using System.Threading.Tasks; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Portable.IO; /// @@ -30,22 +27,13 @@ namespace Apache.Ignite.Core.Impl.Common /// [SuppressMessage("ReSharper", "ParameterHidesMember")] [CLSCompliant(false)] - public sealed class Future : IFutureInternal, IFuture + public sealed class Future : IFutureInternal { /** Converter. */ private readonly IFutureConverter _converter; - /** Result. */ - private T _res; - - /** Caught cxception. */ - private Exception _err; - - /** Done flag. */ - private volatile bool _done; - - /** Listener(s). Either Action or List{Action}. */ - private object _callbacks; + /** Task completion source. */ + private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); /// /// Constructor. @@ -57,139 +45,22 @@ namespace Apache.Ignite.Core.Impl.Common } /** */ - public bool IsDone - { - get { return _done; } - } - - /** */ public T Get() { - if (!_done) + try { - lock (this) - { - while (!_done) - Monitor.Wait(this); - } + return Task.Result; } - - return Get0(); - } - - /** */ - public T Get(TimeSpan timeout) - { - long ticks = timeout.Ticks; - - if (ticks < 0) - throw new ArgumentException("Timeout cannot be negative."); - - if (ticks == 0) - return Get(); - - if (!_done) + catch (AggregateException ex) { - // Fallback to locked mode. - lock (this) - { - long endTime = DateTime.Now.Ticks + ticks; - - if (!_done) - { - while (true) - { - Monitor.Wait(this, timeout); - - if (_done) - break; - - ticks = endTime - DateTime.Now.Ticks; - - if (ticks <= 0) - throw new TimeoutException("Timeout waiting for future completion."); - - timeout = TimeSpan.FromTicks(ticks); - } - } - } + throw ex.InnerException; } - - return Get0(); - } - - /** */ - public void Listen(Action callback) - { - Listen((Action>) (fut => callback())); } /** */ - public void Listen(Action callback) + public Task Task { - Listen((Action>)callback); - } - - /** */ - public void Listen(Action> callback) - { - IgniteArgumentCheck.NotNull(callback, "callback"); - - if (!_done) - { - lock (this) - { - if (!_done) - { - AddCallback(callback); - - return; - } - } - } - - callback(this); - } - - /// - /// Get result or throw an error. - /// - private T Get0() - { - if (_err != null) - throw _err; - - return _res; - } - - /** */ - public IAsyncResult ToAsyncResult() - { - return _done ? (IAsyncResult) new CompletedAsyncResult() : new AsyncResult(this); - } - - /** */ - Task IFuture.ToTask() - { - return Task.Factory.FromAsync(ToAsyncResult(), x => (object) Get()); - } - - /** */ - public Task ToTask() - { - return Task.Factory.FromAsync(ToAsyncResult(), x => Get()); - } - - /** */ - object IFuture.Get(TimeSpan timeout) - { - return Get(timeout); - } - - /** */ - object IFuture.Get() - { - return Get(); + get { return _taskCompletionSource.Task; } } /** */ @@ -209,7 +80,7 @@ namespace Apache.Ignite.Core.Impl.Common /** */ public void OnError(Exception err) { - OnDone(default(T), err); + _taskCompletionSource.TrySetException(err); } /** */ @@ -238,7 +109,7 @@ namespace Apache.Ignite.Core.Impl.Common /// Result. internal void OnResult(T res) { - OnDone(res, null); + _taskCompletionSource.TrySetResult(res); } /// @@ -248,54 +119,10 @@ namespace Apache.Ignite.Core.Impl.Common /// Error. public void OnDone(T res, Exception err) { - object callbacks0 = null; - - lock (this) - { - if (!_done) - { - _res = res; - _err = err; - - _done = true; - - Monitor.PulseAll(this); - - // Notify listeners outside the lock - callbacks0 = _callbacks; - _callbacks = null; - } - } - - if (callbacks0 != null) - { - var list = callbacks0 as List>>; - - if (list != null) - list.ForEach(x => x(this)); - else - ((Action>) callbacks0)(this); - } - } - - /// - /// Adds a callback. - /// - private void AddCallback(Action> callback) - { - if (_callbacks == null) - { - _callbacks = callback; - - return; - } - - var list = _callbacks as List>> ?? - new List>> {(Action>) _callbacks}; - - list.Add(callback); - - _callbacks = list; + if (err != null) + OnError(err); + else + OnResult(res); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs index d7fc59f..d0e920a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs @@ -20,8 +20,8 @@ namespace Apache.Ignite.Core.Impl.Compute using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading.Tasks; using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Compute; /// @@ -44,30 +44,6 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ - public ICompute WithAsync() - { - return new ComputeAsync(_compute); - } - - /** */ - public bool IsAsync - { - get { return false; } - } - - /** */ - public IFuture GetFuture() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** */ - public IFuture GetFuture() - { - throw IgniteUtils.GetAsyncModeDisabledException(); - } - - /** */ public IClusterGroup ClusterGroup { get { return _compute.ClusterGroup; } @@ -104,53 +80,108 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteJavaTaskAsync(string taskName, object taskArg) + { + return _compute.ExecuteJavaTaskAsync(taskName, taskArg).Task; + } + + /** */ public TReduceRes Execute(IComputeTask task, TArg taskArg) { return _compute.Execute(task, taskArg).Get(); } /** */ + public Task ExecuteAsync(IComputeTask task, TArg taskArg) + { + return _compute.Execute(task, taskArg).Task; + } + + /** */ public TJobRes Execute(IComputeTask task) { return _compute.Execute(task, null).Get(); } /** */ + public Task ExecuteAsync(IComputeTask task) + { + return _compute.Execute(task, null).Task; + } + + /** */ public TReduceRes Execute(Type taskType, TArg taskArg) { return _compute.Execute(taskType, taskArg).Get(); } + /** */ + public Task ExecuteAsync(Type taskType, TArg taskArg) + { + return _compute.Execute(taskType, taskArg).Task; + } + + /** */ public TReduceRes Execute(Type taskType) { return _compute.Execute(taskType, null).Get(); } /** */ + public Task ExecuteAsync(Type taskType) + { + return _compute.Execute(taskType, null).Task; + } + + /** */ public TJobRes Call(IComputeFunc clo) { return _compute.Execute(clo).Get(); } /** */ + public Task CallAsync(IComputeFunc clo) + { + return _compute.Execute(clo).Task; + } + + /** */ public TJobRes AffinityCall(string cacheName, object affinityKey, IComputeFunc clo) { return _compute.AffinityCall(cacheName, affinityKey, clo).Get(); } /** */ + public Task AffinityCallAsync(string cacheName, object affinityKey, IComputeFunc clo) + { + return _compute.AffinityCall(cacheName, affinityKey, clo).Task; + } + + /** */ public TJobRes Call(Func func) { return _compute.Execute(func).Get(); } /** */ + public Task CallAsync(IEnumerable> clos, IComputeReducer reducer) + { + return _compute.Execute(clos, reducer).Task; + } + + /** */ public ICollection Call(IEnumerable> clos) { return _compute.Execute(clos).Get(); } /** */ + public Task> CallAsync(IEnumerable> clos) + { + return _compute.Execute(clos).Task; + } + + /** */ public TReduceRes Call(IEnumerable> clos, IComputeReducer reducer) { @@ -164,52 +195,106 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task> BroadcastAsync(IComputeFunc clo) + { + return _compute.Broadcast(clo).Task; + } + + /** */ public ICollection Broadcast(IComputeFunc clo, T arg) { return _compute.Broadcast(clo, arg).Get(); } /** */ + public Task> BroadcastAsync(IComputeFunc clo, TArg arg) + { + return _compute.Broadcast(clo, arg).Task; + } + + /** */ public void Broadcast(IComputeAction action) { _compute.Broadcast(action).Get(); } /** */ + public Task BroadcastAsync(IComputeAction action) + { + return _compute.Broadcast(action).Task; + } + + /** */ public void Run(IComputeAction action) { _compute.Run(action).Get(); } /** */ + public Task RunAsync(IComputeAction action) + { + return _compute.Run(action).Task; + } + + /** */ public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) { _compute.AffinityRun(cacheName, affinityKey, action).Get(); } /** */ + public Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action) + { + return _compute.AffinityRun(cacheName, affinityKey, action).Task; + } + + /** */ public void Run(IEnumerable actions) { _compute.Run(actions).Get(); } /** */ + public Task RunAsync(IEnumerable actions) + { + return _compute.Run(actions).Task; + } + + /** */ public TJobRes Apply(IComputeFunc clo, TArg arg) { return _compute.Apply(clo, arg).Get(); } /** */ + public Task ApplyAsync(IComputeFunc clo, TArg arg) + { + return _compute.Apply(clo, arg).Task; + } + + /** */ public ICollection Apply(IComputeFunc clo, IEnumerable args) { return _compute.Apply(clo, args).Get(); } /** */ + public Task> ApplyAsync(IComputeFunc clo, IEnumerable args) + { + return _compute.Apply(clo, args).Task; + } + + /** */ public TReduceRes Apply(IComputeFunc clo, IEnumerable args, IComputeReducer rdc) { return _compute.Apply(clo, args, rdc).Get(); } + + /** */ + public Task ApplyAsync(IComputeFunc clo, IEnumerable args, IComputeReducer rdc) + { + return _compute.Apply(clo, args, rdc).Task; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs deleted file mode 100644 index 89c5b83..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeAsync.cs +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Impl.Compute -{ - using System; - using System.Collections.Generic; - using System.Diagnostics.CodeAnalysis; - using System.Globalization; - using System.Threading; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Common; - using Apache.Ignite.Core.Compute; - - /// - /// Asynchronous Compute facade. - /// - [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class ComputeAsync : ICompute - { - /** */ - protected readonly ComputeImpl Compute; - - /** Current future. */ - private readonly ThreadLocal _curFut = new ThreadLocal(); - - /// - /// Initializes a new instance of the class. - /// - /// The compute implementation. - internal ComputeAsync(ComputeImpl computeImpl) - { - Compute = computeImpl; - } - - /** */ - public ICompute WithAsync() - { - return this; - } - - /** */ - public bool IsAsync - { - get { return true; } - } - - /** */ - public IFuture GetFuture() - { - return GetFuture(); - } - - /** */ - public IFuture GetFuture() - { - var fut = _curFut.Value; - - if (fut == null) - throw new InvalidOperationException("Asynchronous operation not started."); - - var fut0 = fut as IFuture; - - if (fut0 == null) - throw new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, - "Requested future type {0} is incompatible with current future type {1}", - typeof (IFuture), fut.GetType())); - - _curFut.Value = null; - - return fut0; - } - - /** */ - public IClusterGroup ClusterGroup - { - get { return Compute.ClusterGroup; } - } - - /** */ - public ICompute WithNoFailover() - { - Compute.WithNoFailover(); - - return this; - } - - /** */ - public ICompute WithTimeout(long timeout) - { - Compute.WithTimeout(timeout); - - return this; - } - - /** */ - public ICompute WithKeepPortable() - { - Compute.WithKeepPortable(); - - return this; - } - - /** */ - public TReduceRes ExecuteJavaTask(string taskName, object taskArg) - { - _curFut.Value = Compute.ExecuteJavaTaskAsync(taskName, taskArg); - - return default(TReduceRes); - } - - /** */ - public TReduceRes Execute(IComputeTask task, TArg taskArg) - { - _curFut.Value = Compute.Execute(task, taskArg); - - return default(TReduceRes); - } - - /** */ - public TReduceRes Execute(IComputeTask task) - { - _curFut.Value = Compute.Execute(task, null); - - return default(TReduceRes); - } - - /** */ - public TReduceRes Execute(Type taskType, TArg taskArg) - { - _curFut.Value = Compute.Execute(taskType, taskArg); - - return default(TReduceRes); - } - - /** */ - public TReduceRes Execute(Type taskType) - { - _curFut.Value = Compute.Execute(taskType, null); - - return default(TReduceRes); - } - - /** */ - public TJobRes Call(IComputeFunc clo) - { - _curFut.Value = Compute.Execute(clo); - - return default(TJobRes); - } - - /** */ - public TJobRes AffinityCall(string cacheName, object affinityKey, IComputeFunc clo) - { - Compute.AffinityCall(cacheName, affinityKey, clo); - - return default(TJobRes); - } - - /** */ - public TJobRes Call(Func func) - { - _curFut.Value = Compute.Execute(func); - - return default(TJobRes); - } - - /** */ - public ICollection Call(IEnumerable> clos) - { - _curFut.Value = Compute.Execute(clos); - - return null; - } - - /** */ - public TReduceRes Call(IEnumerable> clos, IComputeReducer reducer) - { - _curFut.Value = Compute.Execute(clos, reducer); - - return default(TReduceRes); - } - - /** */ - public ICollection Broadcast(IComputeFunc clo) - { - _curFut.Value = Compute.Broadcast(clo); - - return null; - } - - /** */ - public ICollection Broadcast(IComputeFunc clo, TArg arg) - { - _curFut.Value = Compute.Broadcast(clo, arg); - - return null; - } - - /** */ - public void Broadcast(IComputeAction action) - { - _curFut.Value = Compute.Broadcast(action); - } - - /** */ - public void Run(IComputeAction action) - { - _curFut.Value = Compute.Run(action); - } - - /** */ - public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) - { - Compute.AffinityRun(cacheName, affinityKey, action); - } - - /** */ - public void Run(IEnumerable actions) - { - _curFut.Value = Compute.Run(actions); - } - - /** */ - public TJobRes Apply(IComputeFunc clo, TArg arg) - { - _curFut.Value = Compute.Apply(clo, arg); - - return default(TJobRes); - } - - /** */ - public ICollection Apply(IComputeFunc clo, IEnumerable args) - { - _curFut.Value = Compute.Apply(clo, args); - - return null; - } - - /** */ - public TReduceRes Apply(IComputeFunc clo, - IEnumerable args, IComputeReducer rdc) - { - _curFut.Value = Compute.Apply(clo, args, rdc); - - return default(TReduceRes); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index abd54da..7adc49f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -150,7 +150,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// If task for given name has not been deployed yet, /// then 'taskName' will be used as task class name to auto-deploy the task. /// - public IFuture ExecuteJavaTaskAsync(string taskName, object taskArg) + public Future ExecuteJavaTaskAsync(string taskName, object taskArg) { IgniteArgumentCheck.NotNullOrEmpty(taskName, "taskName"); @@ -158,7 +158,7 @@ namespace Apache.Ignite.Core.Impl.Compute try { - IFuture fut = null; + Future fut = null; DoOutInOp(OpExecAsync, writer => { @@ -183,7 +183,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Task to execute. /// Optional task argument. /// Task result. - public IFuture Execute(IComputeTask task, + public Future Execute(IComputeTask task, TArg taskArg) { IgniteArgumentCheck.NotNull(task, "task"); @@ -204,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Task type. /// Optional task argument. /// Task result. - public IFuture Execute(Type taskType, TArg taskArg) + public Future Execute(Type taskType, TArg taskArg) { IgniteArgumentCheck.NotNull(taskType, "taskType"); @@ -224,7 +224,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// /// Job to execute. /// Job result for this execution. - public IFuture Execute(IComputeFunc clo) + public Future Execute(IComputeFunc clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -238,7 +238,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// /// Func to execute. /// Job result for this execution. - public IFuture Execute(Func func) + public Future Execute(Func func) { IgniteArgumentCheck.NotNull(func, "func"); @@ -253,7 +253,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// /// Collection of jobs to execute. /// Collection of job results for this execution. - public IFuture> Execute(IEnumerable> clos) + public Future> Execute(IEnumerable> clos) { IgniteArgumentCheck.NotNull(clos, "clos"); @@ -272,7 +272,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Collection of jobs to execute. /// Reducer to reduce all job results into one individual return value. /// Collection of job results for this execution. - public IFuture Execute(IEnumerable> clos, + public Future Execute(IEnumerable> clos, IComputeReducer rdc) { IgniteArgumentCheck.NotNull(clos, "clos"); @@ -290,7 +290,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// /// Job to broadcast to all projection nodes. /// Collection of results for this execution. - public IFuture> Broadcast(IComputeFunc clo) + public Future> Broadcast(IComputeFunc clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -305,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Job to broadcast to all projection nodes. /// Job closure argument. /// Collection of results for this execution. - public IFuture> Broadcast(IComputeFunc clo, TArg arg) + public Future> Broadcast(IComputeFunc clo, TArg arg) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -317,7 +317,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Broadcasts given job to all nodes in grid projection. /// /// Job to broadcast to all projection nodes. - public IFuture Broadcast(IComputeAction action) + public Future Broadcast(IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -329,7 +329,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Executes provided job on a node in this grid projection. /// /// Job to execute. - public IFuture Run(IComputeAction action) + public Future Run(IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -341,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Executes collection of jobs on Ignite nodes within this grid projection. /// /// Jobs to execute. - public IFuture Run(IEnumerable actions) + public Future Run(IEnumerable actions) { IgniteArgumentCheck.NotNull(actions, "actions"); @@ -369,7 +369,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Job to run. /// Job argument. /// Job result for this execution. - public IFuture Apply(IComputeFunc clo, TArg arg) + public Future Apply(IComputeFunc clo, TArg arg) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -385,7 +385,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Job to run. /// Job arguments. /// Collection of job results. - public IFuture> Apply(IComputeFunc clo, + public Future> Apply(IComputeFunc clo, IEnumerable args) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -413,7 +413,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Job arguments. /// Reducer to reduce all job results into one individual return value. /// Reduced job result for this execution. - public IFuture Apply(IComputeFunc clo, + public Future Apply(IComputeFunc clo, IEnumerable args, IComputeReducer rdc) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -440,7 +440,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Name of the cache to use for affinity co-location. /// Affinity key. /// Job to execute. - public IFuture AffinityRun(string cacheName, object affinityKey, IComputeAction action) + public Future AffinityRun(string cacheName, object affinityKey, IComputeAction action) { IgniteArgumentCheck.NotNull(action, "action"); @@ -458,7 +458,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Job to execute. /// Job result for this execution. /// Type of job result. - public IFuture AffinityCall(string cacheName, object affinityKey, IComputeFunc clo) + public Future AffinityCall(string cacheName, object affinityKey, IComputeFunc clo) { IgniteArgumentCheck.NotNull(clo, "clo"); @@ -483,7 +483,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Jobs. /// Broadcast flag. /// Future. - private IFuture ExecuteClosures0( + private Future ExecuteClosures0( IComputeTask task, IComputeJob job, ICollection jobs, bool broadcast) { @@ -503,7 +503,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// Future. [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "User code can throw any exception")] - private IFuture ExecuteClosures0( + private Future ExecuteClosures0( IComputeTask task, IComputeJob job = null, IEnumerable jobs = null, int opId = OpUnicast, int jobsCount = 0, Action writeAction = null) http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs index ef27889..1cd13a8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs @@ -379,7 +379,7 @@ namespace Apache.Ignite.Core.Impl.Compute /// /// Task completion future. /// - internal IFuture Future + internal Future Future { get { return _fut; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs index 49cbc5a..576c805 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerBatch.cs @@ -22,7 +22,7 @@ namespace Apache.Ignite.Core.Impl.Datastream using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; - using Apache.Ignite.Core.Common; + using System.Threading.Tasks; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable; @@ -69,15 +69,15 @@ namespace Apache.Ignite.Core.Impl.Datastream if (prev != null) Thread.MemoryBarrier(); // Prevent "prev" field escape. - _fut.Listen(() => ParentsCompleted()); + _fut.Task.ContinueWith(x => ParentsCompleted()); } /// - /// Gets the future. + /// Gets the task. /// - public IFuture Future + public Task Task { - get { return _fut; } + get { return _fut.Task; } } /// @@ -264,7 +264,7 @@ namespace Apache.Ignite.Core.Impl.Datastream return false; } - return _fut.IsDone; + return _fut.Task.IsCompleted; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cc1aa533/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 9894e93..586d19f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -21,7 +21,7 @@ namespace Apache.Ignite.Core.Impl.Datastream using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Threading; - using Apache.Ignite.Core.Common; + using System.Threading.Tasks; using Apache.Ignite.Core.Datastream; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Portable; @@ -326,13 +326,13 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ - public IFuture Future + public Task Task { get { ThrowIfDisposed(); - return _closeFut; + return _closeFut.Task; } } @@ -396,7 +396,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ - public IFuture AddData(TK key, TV val) + public Task AddData(TK key, TV val) { ThrowIfDisposed(); @@ -406,7 +406,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ - public IFuture AddData(KeyValuePair pair) + public Task AddData(KeyValuePair pair) { ThrowIfDisposed(); @@ -414,7 +414,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ - public IFuture AddData(ICollection> entries) + public Task AddData(ICollection> entries) { ThrowIfDisposed(); @@ -424,7 +424,7 @@ namespace Apache.Ignite.Core.Impl.Datastream } /** */ - public IFuture RemoveData(TK key) + public Task RemoveData(TK key) { ThrowIfDisposed(); @@ -585,7 +585,7 @@ namespace Apache.Ignite.Core.Impl.Datastream /// Value. /// Items count. /// Future. - private IFuture Add0(object val, int cnt) + private Task Add0(object val, int cnt) { int bufSndSize0 = _bufSndSize; @@ -610,7 +610,7 @@ namespace Apache.Ignite.Core.Impl.Datastream // Batch is too big, schedule flush. Flush0(batch0, false, PlcContinue); - return batch0.Future; + return batch0.Task; } }