Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4768D200B66 for ; Thu, 18 Aug 2016 17:14:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 44553160AAE; Thu, 18 Aug 2016 15:14:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1570E160A86 for ; Thu, 18 Aug 2016 17:14:21 +0200 (CEST) Received: (qmail 84242 invoked by uid 500); 18 Aug 2016 15:14:21 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 84233 invoked by uid 99); 18 Aug 2016 15:14:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Aug 2016 15:14:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E766CDFF87; Thu, 18 Aug 2016 15:14:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-2943 .NET: Improve cache error propagation and interop performance Date: Thu, 18 Aug 2016 15:14:20 +0000 (UTC) archived-at: Thu, 18 Aug 2016 15:14:24 -0000 Repository: ignite Updated Branches: refs/heads/master 314eec5ea -> 6899e06fb IGNITE-2943 .NET: Improve cache error propagation and interop performance This closes #672 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6899e06f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6899e06f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6899e06f Branch: refs/heads/master Commit: 6899e06fbcfebff7b6de7e43b0f0b1107d649e00 Parents: 314eec5 Author: Pavel Tupitsyn Authored: Thu Aug 18 18:14:12 2016 +0300 Committer: Pavel Tupitsyn Committed: Thu Aug 18 18:14:12 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 17 +- .../platform/cache/PlatformCache.java | 335 ++++++++++--------- .../dotnet/PlatformDotNetCacheStore.java | 12 +- .../platform/utils/PlatformFutureUtils.java | 6 +- .../platform/utils/PlatformUtils.java | 25 ++ .../include/ignite/impl/binary/binary_utils.h | 87 +++++ .../src/impl/binary/binary_reader_impl.cpp | 30 +- .../ignite/impl/interop/interop_target.h | 15 +- .../cpp/core/include/ignite/impl/operations.h | 47 ++- .../cpp/core/src/impl/cache/cache_impl.cpp | 14 +- .../core/src/impl/interop/interop_target.cpp | 41 ++- .../src/impl/transactions/transactions_impl.cpp | 5 + .../Apache.Ignite.Benchmarks/BenchmarkRunner.cs | 5 +- .../Interop/PlatformBenchmarkBase.cs | 2 +- .../Cache/CacheAbstractTest.cs | 2 +- .../Cache/Store/CacheStoreTest.cs | 39 ++- .../Cache/Store/CacheTestStore.cs | 50 ++- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 239 ++++++------- .../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 10 +- .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 80 ++++- .../Impl/Unmanaged/UnmanagedCallbacks.cs | 20 +- 21 files changed, 745 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 0cd683d..0ca4453 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -38,6 +38,9 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** Constant: FALSE. */ protected static final int FALSE = 0; + /** Constant: ERROR. */ + protected static final int ERROR = -1; + /** */ private static final int OP_META = -1; @@ -69,7 +72,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { return TRUE; } else - return processInStreamOutLong(type, reader); + return processInStreamOutLong(type, reader, mem); } catch (Exception e) { throw convertException(e); @@ -235,6 +238,18 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { } /** + * Process IN operation. + * + * @param type Type. + * @param reader Binary reader. + * @return Result. + * @throws IgniteCheckedException In case of exception. + */ + protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + return processInStreamOutLong(type, reader); + } + + /** * Process IN-OUT operation. * * @param type Type. http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java index d572e8b..a7b6e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java @@ -42,10 +42,13 @@ import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.C1; @@ -290,109 +293,207 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { - switch (type) { - case OP_PUT: - cache.put(reader.readObjectDetached(), reader.readObjectDetached()); + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory mem) throws IgniteCheckedException { + try { + switch (type) { + case OP_PUT: + cache.put(reader.readObjectDetached(), reader.readObjectDetached()); - return TRUE; + return TRUE; - case OP_REMOVE_BOOL: - return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + case OP_GET: + return writeResult(mem, cache.get(reader.readObjectDetached())); - case OP_REMOVE_ALL: - cache.removeAll(PlatformUtils.readSet(reader)); + case OP_REMOVE_BOOL: + return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - return TRUE; + case OP_REMOVE_ALL: + cache.removeAll(PlatformUtils.readSet(reader)); - case OP_PUT_ALL: - cache.putAll(PlatformUtils.readMap(reader)); + return TRUE; - return TRUE; + case OP_PUT_ALL: + cache.putAll(PlatformUtils.readMap(reader)); - case OP_LOC_EVICT: - cache.localEvict(PlatformUtils.readCollection(reader)); + return TRUE; - return TRUE; + case OP_LOC_EVICT: + cache.localEvict(PlatformUtils.readCollection(reader)); - case OP_CONTAINS_KEY: - return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; + return TRUE; - case OP_CONTAINS_KEYS: - return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; + case OP_CONTAINS_KEY: + return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; - case OP_LOC_PROMOTE: { - cache.localPromote(PlatformUtils.readSet(reader)); + case OP_CONTAINS_KEYS: + return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; - break; - } + case OP_LOC_PROMOTE: { + cache.localPromote(PlatformUtils.readSet(reader)); - case OP_REPLACE_3: - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), - reader.readObjectDetached()) ? TRUE : FALSE; + return TRUE; + } - case OP_LOC_LOAD_CACHE: - loadCache0(reader, true); + case OP_REPLACE_3: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), + reader.readObjectDetached()) ? TRUE : FALSE; - break; + case OP_LOC_LOAD_CACHE: + loadCache0(reader, true); - case OP_LOAD_CACHE: - loadCache0(reader, false); + return TRUE; - break; + case OP_LOAD_CACHE: + loadCache0(reader, false); - case OP_CLEAR: - cache.clear(reader.readObjectDetached()); + return TRUE; - break; + case OP_CLEAR: + cache.clear(reader.readObjectDetached()); - case OP_CLEAR_ALL: - cache.clearAll(PlatformUtils.readSet(reader)); + return TRUE; - break; + case OP_CLEAR_ALL: + cache.clearAll(PlatformUtils.readSet(reader)); - case OP_LOCAL_CLEAR: - cache.localClear(reader.readObjectDetached()); + return TRUE; - break; + case OP_LOCAL_CLEAR: + cache.localClear(reader.readObjectDetached()); - case OP_LOCAL_CLEAR_ALL: - cache.localClearAll(PlatformUtils.readSet(reader)); + return TRUE; - break; + case OP_LOCAL_CLEAR_ALL: + cache.localClearAll(PlatformUtils.readSet(reader)); - case OP_PUT_IF_ABSENT: { - return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } + return TRUE; - case OP_REPLACE_2: { - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } + case OP_PUT_IF_ABSENT: + return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - case OP_REMOVE_OBJ: { - return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; - } + case OP_REPLACE_2: + return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_REMOVE_OBJ: + return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; + + case OP_IS_LOCAL_LOCKED: + return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + + case OP_LOAD_ALL: { + long futId = reader.readLong(); + boolean replaceExisting = reader.readBoolean(); + + CompletionListenable fut = new CompletionListenable(); + + PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + + cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + + return TRUE; + } + + case OP_GET_AND_PUT: + return writeResult(mem, cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_GET_AND_REPLACE: + return writeResult(mem, cache.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_GET_AND_REMOVE: + return writeResult(mem, cache.getAndRemove(reader.readObjectDetached())); + + case OP_GET_AND_PUT_IF_ABSENT: + return writeResult(mem, cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); + + case OP_PEEK: { + Object key = reader.readObjectDetached(); + + CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); + + return writeResult(mem, cache.localPeek(key, modes)); + } + + case OP_GET_ALL: { + Set keys = PlatformUtils.readSet(reader); + + Map entries = cache.getAll(keys); + + return writeResult(mem, entries, new PlatformWriterClosure() { + @Override public void write(BinaryRawWriterEx writer, Map val) { + PlatformUtils.writeNullableMap(writer, val); + } + }); + } + + case OP_INVOKE: { + Object key = reader.readObjectDetached(); - case OP_IS_LOCAL_LOCKED: - return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - case OP_LOAD_ALL: { - long futId = reader.readLong(); - boolean replaceExisting = reader.readBoolean(); + return writeResult(mem, cache.invoke(key, proc)); + } - CompletionListenable fut = new CompletionListenable(); + case OP_INVOKE_ALL: { + Set keys = PlatformUtils.readSet(reader); - PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this); + CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut); + Map results = cache.invokeAll(keys, proc); - return TRUE; + return writeResult(mem, results, new PlatformWriterClosure() { + @Override public void write(BinaryRawWriterEx writer, Map val) { + writeInvokeAllResult(writer, val); + } + }); + } + + case OP_LOCK: + return registerLock(cache.lock(reader.readObjectDetached())); + + case OP_LOCK_ALL: + return registerLock(cache.lockAll(PlatformUtils.readCollection(reader))); } + } + catch (Exception e) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = platformCtx.writer(out); - default: - return super.processInStreamOutLong(type, reader); + Exception err = convertException(e); + + PlatformUtils.writeError(err, writer); + PlatformUtils.writeErrorData(err, writer); + + out.synchronize(); + + return ERROR; } + return super.processInStreamOutLong(type, reader, mem); + } + + /** + * Writes the result to reused stream, if any. + */ + private long writeResult(PlatformMemory mem, Object obj) { + return writeResult(mem, obj, null); + } + + /** + * Writes the result to reused stream, if any. + */ + private long writeResult(PlatformMemory mem, Object obj, PlatformWriterClosure clo) { + if (obj == null) + return FALSE; + + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = platformCtx.writer(out); + + if (clo == null) + writer.writeObjectDetached(obj); + else + clo.write(writer, obj); + + out.synchronize(); return TRUE; } @@ -555,106 +656,6 @@ public class PlatformCache extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) - throws IgniteCheckedException { - switch (type) { - case OP_GET: { - writer.writeObjectDetached(cache.get(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT: { - writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REPLACE: { - writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(), - reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REMOVE: { - writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT_IF_ABSENT: { - writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_PEEK: { - Object key = reader.readObjectDetached(); - - CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); - - writer.writeObjectDetached(cache.localPeek(key, modes)); - - break; - } - - case OP_GET_ALL: { - Set keys = PlatformUtils.readSet(reader); - - Map entries = cache.getAll(keys); - - PlatformUtils.writeNullableMap(writer, entries); - - break; - } - - case OP_INVOKE: { - Object key = reader.readObjectDetached(); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - try { - writer.writeObjectDetached(cache.invoke(key, proc)); - } - catch (EntryProcessorException ex) - { - if (ex.getCause() instanceof PlatformNativeException) - writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); - else - throw ex; - } - - break; - } - - case OP_INVOKE_ALL: { - Set keys = PlatformUtils.readSet(reader); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - writeInvokeAllResult(writer, cache.invokeAll(keys, proc)); - - break; - } - - case OP_LOCK: - writer.writeLong(registerLock(cache.lock(reader.readObjectDetached()))); - - break; - - case OP_LOCK_ALL: - writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader)))); - - break; - - default: - super.processInStreamOutStream(type, reader, writer); - } - } - - /** {@inheritDoc} */ @Override public Exception convertException(Exception e) { if (e instanceof CachePartialUpdateException) return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(), @@ -699,7 +700,7 @@ public class PlatformCache extends PlatformAbstractTarget { catch (Exception ex) { writer.writeBoolean(true); // Exception - writeError(writer, ex); + PlatformUtils.writeError(ex, writer); } } } @@ -1033,7 +1034,7 @@ public class PlatformCache extends PlatformAbstractTarget { else { writer.writeBoolean(true); // Error. - writeError(writer, (Exception) err); + PlatformUtils.writeError(err, writer); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java index 1c60a42..d38fd8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java @@ -389,10 +389,9 @@ public class PlatformDotNetCacheStore implements CacheStore, Platfor * * @param task Task. * @param cb Optional callback. - * @return Result. * @throws org.apache.ignite.IgniteCheckedException If failed. */ - protected int doInvoke(IgniteInClosureX task, @Nullable PlatformCacheStoreCallback cb) + protected void doInvoke(IgniteInClosureX task, @Nullable PlatformCacheStoreCallback cb) throws IgniteCheckedException{ try (PlatformMemory mem = platformCtx.memory().allocate()) { PlatformOutputStream out = mem.output(); @@ -403,7 +402,14 @@ public class PlatformDotNetCacheStore implements CacheStore, Platfor out.synchronize(); - return platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb); + int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb); + + if (res != 0) { + // Read error + Object nativeErr = platformCtx.reader(mem.input()).readObjectDetached(); + + throw platformCtx.createNativeException(nativeErr); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 6692a23..5985d22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -293,10 +292,7 @@ public class PlatformFutureUtils { BinaryRawWriterEx outWriter = ctx.writer(out); - outWriter.writeString(err.getClass().getName()); - outWriter.writeString(err.getMessage()); - outWriter.writeString(X.getFullStackTrace(err)); - + PlatformUtils.writeError(err, outWriter); PlatformUtils.writeErrorData(err, outWriter); out.synchronize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index dd90fda..ccdd59d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -575,6 +575,31 @@ public class PlatformUtils { } /** + * Writes error. + * + * @param ex Error. + * @param writer Writer. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public static void writeError(Throwable ex, BinaryRawWriterEx writer) { + writer.writeObjectDetached(ex.getClass().getName()); + + writer.writeObjectDetached(ex.getMessage()); + + writer.writeObjectDetached(X.getFullStackTrace(ex)); + + PlatformNativeException nativeCause = X.cause(ex, PlatformNativeException.class); + + if (nativeCause != null) { + writer.writeBoolean(true); + + writer.writeObjectDetached(nativeCause.cause()); + } + else + writer.writeBoolean(false); + } + + /** * Writer error data. * * @param err Error. http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h index 88130d8..3abd651 100644 --- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h +++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_utils.h @@ -26,6 +26,8 @@ #include "ignite/date.h" #include "ignite/timestamp.h" +#include "ignite/binary/binary_type.h" + namespace ignite { namespace impl @@ -538,7 +540,92 @@ namespace ignite */ static Timestamp MakeTimestampLocal(int year = 1900, int month = 1, int day = 1, int hour = 0, int min = 0, int sec = 0, long ns = 0); + + /** + * Get default value for the type. + * + * @return Null value for non primitive types and zeroes for primitives. + */ + template + static T GetDefaultValue() + { + ignite::binary::BinaryType binType; + + return binType.GetNull(); + } }; + + template<> + inline int8_t BinaryUtils::GetDefaultValue() + { + return 0; + } + + template<> + inline int16_t BinaryUtils::GetDefaultValue() + { + return 0; + } + + template<> + inline uint16_t BinaryUtils::GetDefaultValue() + { + return 0; + } + + template<> + inline int32_t BinaryUtils::GetDefaultValue() + { + return 0; + } + + template<> + inline int64_t BinaryUtils::GetDefaultValue() + { + return 0; + } + + template<> + inline bool BinaryUtils::GetDefaultValue() + { + return false; + } + + template<> + inline float BinaryUtils::GetDefaultValue() + { + return 0.0f; + } + + template<> + inline double BinaryUtils::GetDefaultValue() + { + return 0.0; + } + + template<> + inline Guid BinaryUtils::GetDefaultValue() + { + return Guid(); + } + + template<> + inline Date BinaryUtils::GetDefaultValue() + { + return Date(); + } + + template<> + inline Timestamp BinaryUtils::GetDefaultValue() + { + return Timestamp(); + } + + template<> + inline std::string BinaryUtils::GetDefaultValue() + { + return std::string(); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp index 33205e4..c3f4fcc 100644 --- a/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_reader_impl.cpp @@ -676,49 +676,57 @@ namespace ignite template <> int8_t BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_BYTE, BinaryUtils::ReadInt8, + BinaryUtils::GetDefaultValue()); } template <> bool BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_BOOL, BinaryUtils::ReadBool, + BinaryUtils::GetDefaultValue()); } template <> int16_t BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_SHORT, BinaryUtils::ReadInt16, + BinaryUtils::GetDefaultValue()); } template <> uint16_t BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_CHAR, BinaryUtils::ReadUInt16, + BinaryUtils::GetDefaultValue()); } template <> int32_t BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_INT, BinaryUtils::ReadInt32, + BinaryUtils::GetDefaultValue()); } template <> int64_t BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_LONG, BinaryUtils::ReadInt64, + BinaryUtils::GetDefaultValue()); } template <> float BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_FLOAT, BinaryUtils::ReadFloat, + BinaryUtils::GetDefaultValue()); } template <> double BinaryReaderImpl::ReadTopObject() { - return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, static_cast(0)); + return ReadTopObject0(IGNITE_TYPE_DOUBLE, BinaryUtils::ReadDouble, + BinaryUtils::GetDefaultValue()); } template <> @@ -729,7 +737,7 @@ namespace ignite if (typeId == IGNITE_TYPE_UUID) return BinaryUtils::ReadGuid(stream); else if (typeId == IGNITE_HDR_NULL) - return Guid(); + return BinaryUtils::GetDefaultValue(); else { int32_t pos = stream->Position() - 1; @@ -747,7 +755,7 @@ namespace ignite else if (typeId == IGNITE_TYPE_TIMESTAMP) return Date(BinaryUtils::ReadTimestamp(stream).GetMilliseconds()); else if (typeId == IGNITE_HDR_NULL) - return Date(); + return BinaryUtils::GetDefaultValue(); else { int32_t pos = stream->Position() - 1; @@ -763,7 +771,7 @@ namespace ignite if (typeId == IGNITE_TYPE_TIMESTAMP) return BinaryUtils::ReadTimestamp(stream); else if (typeId == IGNITE_HDR_NULL) - return Timestamp(); + return BinaryUtils::GetDefaultValue(); else { int32_t pos = stream->Position() - 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h index 8b6ebb9..4042fa2 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h +++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h @@ -68,14 +68,25 @@ namespace ignite /** * Internal out-in operation. + * Uses two independent memory pieces to write and read data. * * @param opType Operation type. * @param inOp Input. * @param outOp Output. * @param err Error. */ - void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, - IgniteError* err); + void OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err); + + /** + * Internal out-in operation. + * Uses single memory piece to write and read data. + * + * @param opType Operation type. + * @param inOp Input. + * @param outOp Output. + * @param err Error. + */ + void OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err); /** * Get environment shared pointer. http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/include/ignite/impl/operations.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h index ed01ece..a8fef93 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/operations.h +++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h @@ -27,6 +27,7 @@ #include "ignite/cache/cache_entry.h" #include "ignite/impl/binary/binary_reader_impl.h" #include "ignite/impl/binary/binary_writer_impl.h" +#include "ignite/impl/binary/binary_utils.h" #include "ignite/binary/binary.h" namespace ignite @@ -270,7 +271,12 @@ namespace ignite * * @param reader Reader. */ - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) = 0; + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) = 0; + + /** + * Sets result to null value. + */ + virtual void SetNull() = 0; }; /** @@ -288,11 +294,16 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val = reader.ReadTopObject(); } + virtual void SetNull() + { + val = binary::BinaryUtils::GetDefaultValue(); + } + /** * Get value. * @@ -324,12 +335,18 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val1 = reader.ReadTopObject(); val2 = reader.ReadTopObject(); } + virtual void SetNull() + { + val1 = binary::BinaryUtils::GetDefaultValue(); + val2 = binary::BinaryUtils::GetDefaultValue(); + } + /** * Get value 1. * @@ -375,7 +392,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { val1 = reader.ReadTopObject(); val2 = reader.ReadTopObject(); @@ -383,6 +400,14 @@ namespace ignite val4 = reader.ReadTopObject(); } + virtual void SetNull() + { + val1 = binary::BinaryUtils::GetDefaultValue(); + val2 = binary::BinaryUtils::GetDefaultValue(); + val3 = binary::BinaryUtils::GetDefaultValue(); + val4 = binary::BinaryUtils::GetDefaultValue(); + } + /** * Get value 1. * @@ -454,7 +479,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { bool exists = reader.GetStream()->ReadBool(); @@ -475,6 +500,11 @@ namespace ignite } } + virtual void SetNull() + { + // No-op. + } + /** * Get value. * @@ -506,7 +536,7 @@ namespace ignite // No-op. } - virtual void ProcessOutput(ignite::impl::binary::BinaryReaderImpl& reader) + virtual void ProcessOutput(binary::BinaryReaderImpl& reader) { int32_t cnt = reader.ReadInt32(); @@ -519,6 +549,11 @@ namespace ignite } } + virtual void SetNull() + { + res->clear(); + } + private: /** Entries. */ std::vector>* res; http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index e728f55..8197187 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -156,17 +156,17 @@ namespace ignite void CacheImpl::LocalPeek(InputOperation& inOp, OutputOperation& outOp, int32_t peekModes, IgniteError* err) { - OutInOp(OP_LOCAL_PEEK, inOp, outOp, err); + OutInOpX(OP_LOCAL_PEEK, inOp, outOp, err); } void CacheImpl::Get(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET, inOp, outOp, err); + OutInOpX(OP_GET, inOp, outOp, err); } void CacheImpl::GetAll(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_ALL, inOp, outOp, err); + OutInOpX(OP_GET_ALL, inOp, outOp, err); } void CacheImpl::Put(InputOperation& inOp, IgniteError* err) @@ -181,17 +181,17 @@ namespace ignite void CacheImpl::GetAndPut(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_PUT, inOp, outOp, err); + OutInOpX(OP_GET_AND_PUT, inOp, outOp, err); } void CacheImpl::GetAndReplace(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_REPLACE, inOp, outOp, err); + OutInOpX(OP_GET_AND_REPLACE, inOp, outOp, err); } void CacheImpl::GetAndRemove(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_REMOVE, inOp, outOp, err); + OutInOpX(OP_GET_AND_REMOVE, inOp, outOp, err); } bool CacheImpl::PutIfAbsent(InputOperation& inOp, IgniteError* err) @@ -201,7 +201,7 @@ namespace ignite void CacheImpl::GetAndPutIfAbsent(InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { - OutInOp(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err); + OutInOpX(OP_GET_AND_PUT_IF_ABSENT, inOp, outOp, err); } bool CacheImpl::Replace(InputOperation& inOp, IgniteError* err) http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp index 05764c7..5d17214 100644 --- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp +++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp @@ -31,6 +31,21 @@ namespace ignite { namespace interop { + /** + * Operation result. + */ + enum OperationResult + { + /** Null. */ + ResultNull = 0, + + /** Object. */ + ResultObject = 1, + + /** Error. */ + ResultError = -1 + }; + InteropTarget::InteropTarget(SharedPointer env, jobject javaRef) : env(env), javaRef(javaRef) { @@ -116,8 +131,7 @@ namespace ignite return false; } - void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, - IgniteError* err) + void InteropTarget::OutInOp(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err) { JniErrorInfo jniErr; @@ -137,6 +151,29 @@ namespace ignite ReadFrom(inMem.Get(), outOp); } } + + void InteropTarget::OutInOpX(int32_t opType, InputOperation& inOp, OutputOperation& outOp, IgniteError* err) + { + JniErrorInfo jniErr; + + SharedPointer outInMem = env.Get()->AllocateMemory(); + + int64_t outInPtr = WriteTo(outInMem.Get(), inOp, err); + + if (outInPtr) + { + int64_t res = env.Get()->Context()->TargetInStreamOutLong(javaRef, opType, outInPtr, &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS && res == ResultObject) + ReadFrom(outInMem.Get(), outOp); + else if (res == ResultNull) + outOp.SetNull(); + + //Read and process error if res == ResultError here. + } + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp index 6c01332..fed43fc 100644 --- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp @@ -145,6 +145,11 @@ namespace ignite val = TransactionMetrics(commitTime, rollbackTime, commits, rollbacks); } + virtual void SetNull() + { + // No-op. + } + /** * Get value. * http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs index 5d8e78a..40ae01e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/BenchmarkRunner.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Benchmarks using System.Diagnostics; using System.Text; using Apache.Ignite.Benchmarks.Binary; + using Apache.Ignite.Benchmarks.Interop; /// /// Benchmark runner. @@ -35,8 +36,8 @@ namespace Apache.Ignite.Benchmarks public static void Main(string[] args) { args = new[] { - typeof(BinarizableReadBenchmark).FullName, - "-ConfigPath", @"modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml", + typeof(GetBenchmark).FullName, + "-ConfigPath", @"C:\W\incubator-ignite\modules\platforms\dotnet\Apache.Ignite.Benchmarks\Config\benchmark.xml", "-Threads", "1", "-Warmup", "0", "-Duration", "60", http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs index eeebed0..f437eb8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Interop/PlatformBenchmarkBase.cs @@ -66,7 +66,7 @@ namespace Apache.Ignite.Benchmarks.Interop "-DIGNITE_QUIET=false", "-DIGNITE_NO_SHUTDOWN_HOOK=true" }, - JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(), + JvmClasspath = Classpath ?? Core.Impl.Common.Classpath.CreateClasspath(forceTestClasspath: true), JvmDllPath = DllPath, SpringConfigUrl = ConfigPath }; http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs index 5fb2cdd..9fd1f1d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAbstractTest.cs @@ -2952,7 +2952,7 @@ namespace Apache.Ignite.Core.Tests.Cache Assert.IsInstanceOf(ex); if (string.IsNullOrEmpty(containsText)) - Assert.AreEqual(ex.InnerException.Message, AddArgCacheEntryProcessor.ExceptionText); + Assert.AreEqual(AddArgCacheEntryProcessor.ExceptionText, ex.InnerException.Message); else Assert.IsTrue(ex.ToString().Contains(containsText)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs index 8061e9f..d39ccde 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -178,13 +178,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store [TearDown] public void AfterTest() { + CacheTestStore.Reset(); + var cache = GetCache(); cache.Clear(); - Assert.IsTrue(cache.IsEmpty(), "Cache is not empty: " + cache.GetSize()); - - CacheTestStore.Reset(); + Assert.IsTrue(cache.IsEmpty(), + "Cache is not empty: " + + string.Join(", ", cache.Select(x => string.Format("[{0}:{1}]", x.Key, x.Value)))); TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName)); @@ -210,6 +212,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Store // Test exception in filter Assert.Throws(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10)); + + // Test exception in store + CacheTestStore.ThrowError = true; + CheckCustomStoreError(Assert.Throws(() => + cache.LoadCache(new CacheEntryFilter(), 100, 10)).InnerException); } [Test] @@ -262,6 +269,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { Assert.AreEqual("val_" + i, cache.GetAsync(i).Result); } + + // Test errors + CacheTestStore.ThrowError = true; + CheckCustomStoreError( + Assert.Throws( + () => cache.LocalLoadCacheAsync(new CacheEntryFilter(), 100, 10).Wait()) + .InnerException); } [Test] @@ -282,6 +296,13 @@ namespace Apache.Ignite.Core.Tests.Cache.Store Assert.AreEqual("val", cache.Get(1)); Assert.AreEqual(1, cache.GetSize()); + + // Test errors + CacheTestStore.ThrowError = true; + CheckCustomStoreError(Assert.Throws(() => cache.Put(-2, "fail")).InnerException); + + cache.LocalEvict(new[] { 1 }); + CheckCustomStoreError(Assert.Throws(() => cache.Get(1)).InnerException); } [Test] @@ -418,8 +439,6 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using (var tx = cache.Ignite.GetTransactions().TxStart()) { - CacheTestStore.ExpCommit = true; - tx.AddMeta("meta", 100); cache.Put(1, "val"); @@ -549,6 +568,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Store return Ignition.GetIgnite(GridName).GetOrCreateCache(cacheName); } + + private static void CheckCustomStoreError(Exception err) + { + var customErr = err as CacheTestStore.CustomStoreException ?? + err.InnerException as CacheTestStore.CustomStoreException; + + Assert.IsNotNull(customErr); + + Assert.AreEqual(customErr.Message, customErr.Details); + } } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs index b4b1670..4224835 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheTestStore.cs @@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; + using System.Runtime.Serialization; using System.Threading; using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Resource; @@ -32,12 +33,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { public static readonly IDictionary Map = new ConcurrentDictionary(); - public static bool ExpCommit; - public static bool LoadMultithreaded; public static bool LoadObjects; + public static bool ThrowError; + [InstanceResource] private IIgnite _grid = null; @@ -54,13 +55,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Store { Map.Clear(); - ExpCommit = false; LoadMultithreaded = false; LoadObjects = false; + ThrowError = false; } public void LoadCache(Action act, params object[] args) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); if (LoadMultithreaded) @@ -91,6 +94,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public object Load(object key) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); return Map[key]; @@ -98,6 +103,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public IDictionary LoadAll(ICollection keys) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); return keys.OfType().ToDictionary(key => key, key => "val_" + key); @@ -105,6 +112,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void Write(object key, object val) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); Map[key] = val; @@ -112,6 +121,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void WriteAll(IDictionary map) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); foreach (DictionaryEntry e in map) @@ -120,6 +131,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void Delete(object key) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); Map.Remove(key); @@ -127,6 +140,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store public void DeleteAll(ICollection keys) { + ThrowIfNeeded(); + Debug.Assert(_grid != null); foreach (object key in keys) @@ -151,5 +166,34 @@ namespace Apache.Ignite.Core.Tests.Cache.Store get { return stringProperty; } set { stringProperty = value; } } + + private static void ThrowIfNeeded() + { + if (ThrowError) + throw new CustomStoreException("Exception in cache store"); + } + + [Serializable] + public class CustomStoreException : Exception, ISerializable + { + public string Details { get; private set; } + + public CustomStoreException(string message) : base(message) + { + Details = message; + } + + protected CustomStoreException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) + { + Details = info.GetString("details"); + } + + public override void GetObjectData(SerializationInfo info, StreamingContext context) + { + info.AddValue("details", Details); + + base.GetObjectData(info, context); + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index 32c59de..8ba3e29 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Impl.Cache /// private void LoadCache0(ICacheEntryFilter p, object[] args, int opId) { - DoOutOp(opId, writer => + DoOutInOpX(opId, writer => { if (p != null) { @@ -284,7 +284,7 @@ namespace Apache.Ignite.Core.Impl.Cache writer.WriteObject(null); writer.WriteArray(args); - }); + }, ReadException); } /** */ @@ -296,7 +296,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** */ public Task LoadAllAsync(IEnumerable keys, bool replaceExistingValues) { - return GetFuture((futId, futTyp) => DoOutOp((int) CacheOp.LoadAll, writer => + return GetFuture((futId, futTyp) => DoOutOp(CacheOp.LoadAll, writer => { writer.WriteLong(futId); writer.WriteBoolean(replaceExistingValues); @@ -309,7 +309,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.ContainsKey, key) == True; + return DoOutOp(CacheOp.ContainsKey, key); } /** */ @@ -325,7 +325,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOp((int)CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)) == True; + return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)); } /** */ @@ -354,11 +354,14 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - var res = DoOutInOpNullable((int)CacheOp.Peek, writer => - { - writer.Write(key); - writer.WriteInt(EncodePeekModes(modes)); - }); + var res = DoOutInOpX((int)CacheOp.Peek, + w => + { + w.Write(key); + w.WriteInt(EncodePeekModes(modes)); + }, + (s, r) => r == True ? new CacheResult(Unmarshal(s)) : new CacheResult(), + ReadException); value = res.Success ? res.Value : default(TV); @@ -389,19 +392,22 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - var result = DoOutInOpNullable((int) CacheOp.Get, key); - - if (!IsAsync) - { - if (!result.Success) - throw GetKeyNotFoundException(); + return DoOutInOpX((int) CacheOp.Get, + w => w.Write(key), + (stream, res) => + { + if (res == True) // Not null + { + Debug.Assert(!IsAsync); - return result.Value; - } + return Unmarshal(stream); + } - Debug.Assert(!result.Success); + if (!IsAsync) + throw GetKeyNotFoundException(); - return default(TV); + return default(TV); + }, ReadException); } /** */ @@ -426,7 +432,7 @@ namespace Apache.Ignite.Core.Impl.Cache if (IsAsync) throw new InvalidOperationException("TryGet can't be used in async mode."); - var res = DoOutInOpNullable((int) CacheOp.Get, key); + var res = DoOutInOpNullable(CacheOp.Get, key); value = res.Value; @@ -448,14 +454,10 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp((int)CacheOp.GetAll, + return DoOutInOpX((int) CacheOp.GetAll, writer => WriteEnumerable(writer, keys), - input => - { - var reader = Marshaller.StartUnmarshal(input, _flagKeepBinary); - - return ReadGetAllDictionary(reader); - }); + (s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null, + ReadException); } /** */ @@ -473,7 +475,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - DoOutOp((int)CacheOp.Put, key, val); + DoOutOp(CacheOp.Put, key, val); } /** */ @@ -491,7 +493,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable((int)CacheOp.GetAndPut, key, val); + return DoOutInOpNullable(CacheOp.GetAndPut, key, val); } /** */ @@ -509,7 +511,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable((int) CacheOp.GetAndReplace, key, val); + return DoOutInOpNullable(CacheOp.GetAndReplace, key, val); } /** */ @@ -525,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOpNullable((int)CacheOp.GetAndRemove, key); + return DoOutInOpNullable(CacheOp.GetAndRemove, key); } /** */ @@ -543,7 +545,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int) CacheOp.PutIfAbsent, key, val) == True; + return DoOutOp(CacheOp.PutIfAbsent, key, val); } /** */ @@ -561,7 +563,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutInOpNullable((int)CacheOp.GetAndPutIfAbsent, key, val); + return DoOutInOpNullable(CacheOp.GetAndPutIfAbsent, key, val); } /** */ @@ -579,7 +581,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int) CacheOp.Replace2, key, val) == True; + return DoOutOp(CacheOp.Replace2, key, val); } /** */ @@ -599,7 +601,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(newVal, "newVal"); - return DoOutOp((int)CacheOp.Replace3, key, oldVal, newVal) == True; + return DoOutOp(CacheOp.Replace3, key, oldVal, newVal); } /** */ @@ -615,7 +617,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(vals, "vals"); - DoOutOp((int) CacheOp.PutAll, writer => WriteDictionary(writer, vals)); + DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals)); } /** */ @@ -631,7 +633,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int) CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); } /** */ @@ -653,7 +655,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int) CacheOp.Clear, key); + DoOutOp(CacheOp.Clear, key); } /** */ @@ -669,7 +671,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); } /** */ @@ -685,7 +687,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - DoOutOp((int) CacheOp.LocalClear, key); + DoOutOp(CacheOp.LocalClear, key); } /** */ @@ -693,7 +695,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); } /** */ @@ -701,7 +703,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int) CacheOp.RemoveObj, key) == True; + return DoOutOp(CacheOp.RemoveObj, key); } /** */ @@ -719,7 +721,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(val, "val"); - return DoOutOp((int)CacheOp.RemoveBool, key, val) == True; + return DoOutOp(CacheOp.RemoveBool, key, val); } /** */ @@ -735,7 +737,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); } /** */ @@ -798,7 +800,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp((int)CacheOp.LocPromote, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocPromote, writer => WriteEnumerable(writer, keys)); } /** */ @@ -811,12 +813,14 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int)CacheOp.Invoke, writer => - { - writer.Write(key); - writer.Write(holder); - }, - input => GetResultOrThrow(Unmarshal(input))); + return DoOutInOpX((int) CacheOp.Invoke, + writer => + { + writer.Write(key); + writer.Write(holder); + }, + (input, res) => res == True ? Unmarshal(input) : default(TRes), + ReadException); } /** */ @@ -849,17 +853,19 @@ namespace Apache.Ignite.Core.Impl.Cache var holder = new CacheEntryProcessorHolder(processor, arg, (e, a) => processor.Process((IMutableCacheEntry)e, (TArg)a), typeof(TK), typeof(TV)); - return DoOutInOp((int) CacheOp.InvokeAll, + return DoOutInOpX((int) CacheOp.InvokeAll, writer => { WriteEnumerable(writer, keys); writer.Write(holder); }, - input => ReadInvokeAllResults(input)); + (input, res) => res == True ? ReadInvokeAllResults(input) : null, + ReadException); } /** */ - public Task>> InvokeAllAsync(IEnumerable keys, ICacheEntryProcessor processor, TArg arg) + public Task>> InvokeAllAsync(IEnumerable keys, + ICacheEntryProcessor processor, TArg arg) { AsyncInstance.InvokeAll(keys, processor, arg); @@ -871,10 +877,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutInOp((int)CacheOp.Lock, writer => - { - writer.Write(key); - }, input => new CacheLock(input.ReadInt(), Target)); + return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key), + (stream, res) => new CacheLock(res, Target), ReadException); } /** */ @@ -882,10 +886,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp((int)CacheOp.LockAll, writer => - { - WriteEnumerable(writer, keys); - }, input => new CacheLock(input.ReadInt(), Target)); + return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys), + (stream, res) => new CacheLock(res, Target), ReadException); } /** */ @@ -893,11 +895,11 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(key, "key"); - return DoOutOp((int)CacheOp.IsLocalLocked, writer => + return DoOutOp(CacheOp.IsLocalLocked, writer => { writer.Write(key); writer.WriteBoolean(byCurrentThread); - }) == True; + }); } /** */ @@ -1159,22 +1161,6 @@ namespace Apache.Ignite.Core.Impl.Cache } /// - /// Unwraps an exception. - /// - /// Result type. - /// Object. - /// Result. - private static T GetResultOrThrow(object obj) - { - var err = obj as Exception; - - if (err != null) - throw err as CacheEntryProcessorException ?? new CacheEntryProcessorException(err); - - return obj == null ? default(T) : (T) obj; - } - - /// /// Reads results of InvokeAll operation. /// /// The type of the result. @@ -1208,9 +1194,11 @@ namespace Apache.Ignite.Core.Impl.Cache /// /// The stream. /// Exception. - private CacheEntryProcessorException ReadException(IBinaryStream inStream) + private Exception ReadException(IBinaryStream inStream) { - var item = Unmarshal(inStream); + var reader = Marshaller.StartUnmarshal(inStream, _flagKeepBinary); + + var item = reader.ReadObject(); var clsName = item as string; @@ -1219,8 +1207,9 @@ namespace Apache.Ignite.Core.Impl.Cache var msg = Unmarshal(inStream); var trace = Unmarshal(inStream); - - return new CacheEntryProcessorException(ExceptionUtils.GetException(_ignite, clsName, msg, trace)); + var inner = reader.ReadBoolean() ? reader.ReadObject() : null; + + return ExceptionUtils.GetException(_ignite, clsName, msg, trace, reader, inner); } /// @@ -1272,49 +1261,73 @@ namespace Apache.Ignite.Core.Impl.Cache } /// - /// Perform simple out-in operation accepting single argument. + /// Does the out op. /// - /// Operation type. - /// Value. - /// Result. - private CacheResult DoOutInOpNullable(int type, T1 val) + private bool DoOutOp(CacheOp op, T1 x) { - var res = DoOutInOp(type, val); + return DoOutInOpX((int) op, w => + { + w.Write(x); + }, ReadException); + } - return res == null - ? new CacheResult() - : new CacheResult((TR)res); + /// + /// Does the out op. + /// + private bool DoOutOp(CacheOp op, T1 x, T2 y) + { + return DoOutInOpX((int) op, w => + { + w.Write(x); + w.Write(y); + }, ReadException); } /// - /// Perform out-in operation. + /// Does the out op. /// - /// Operation type. - /// Out action. - /// Result. - private CacheResult DoOutInOpNullable(int type, Action outAction) + private bool DoOutOp(CacheOp op, T1 x, T2 y, T3 z) { - var res = DoOutInOp(type, outAction); + return DoOutInOpX((int) op, w => + { + w.Write(x); + w.Write(y); + w.Write(z); + }, ReadException); + } - return res == null - ? new CacheResult() - : new CacheResult((TR)res); + /// + /// Does the out op. + /// + private bool DoOutOp(CacheOp op, Action write) + { + return DoOutInOpX((int) op, write, ReadException); } /// - /// Perform simple out-in operation accepting single argument. + /// Does the out-in op. /// - /// Operation type. - /// Value. - /// Value. - /// Result. - private CacheResult DoOutInOpNullable(int type, T1 val1, T2 val2) + private CacheResult DoOutInOpNullable(CacheOp cacheOp, TK x) { - var res = DoOutInOp(type, val1, val2); + return DoOutInOpX((int)cacheOp, + w => w.Write(x), + (stream, res) => res == True ? new CacheResult(Unmarshal(stream)) : new CacheResult(), + ReadException); + } - return res == null - ? new CacheResult() - : new CacheResult((TR)res); + /// + /// Does the out-in op. + /// + private CacheResult DoOutInOpNullable(CacheOp cacheOp, T1 x, T2 y) + { + return DoOutInOpX((int)cacheOp, + w => + { + w.Write(x); + w.Write(y); + }, + (stream, res) => res == True ? new CacheResult(Unmarshal(stream)) : new CacheResult(), + ReadException); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs index a8c1471..a59ca5f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -115,10 +115,14 @@ namespace Apache.Ignite.Core.Impl /// Exception message. /// Native stack trace. /// Error data reader. + /// Inner exception. /// Exception. - public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace, BinaryReader reader = null) + public static Exception GetException(IIgnite ignite, string clsName, string msg, string stackTrace, + BinaryReader reader = null, Exception innerException = null) { - Exception innerException = string.IsNullOrEmpty(stackTrace) ? null : new JavaException(stackTrace); + // Set JavaException as inner only if there is no InnerException. + if (innerException == null && !string.IsNullOrEmpty(stackTrace)) + innerException = new JavaException(stackTrace); ExceptionFactoryDelegate ctor; @@ -158,7 +162,7 @@ namespace Apache.Ignite.Core.Impl /// Reader. /// CachePartialUpdateException. [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] - private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace, + private static Exception ProcessCachePartialUpdateException(IIgnite ignite, string msg, string stackTrace, BinaryReader reader) { if (reader == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index f757cbc..ac613c6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -43,6 +43,9 @@ namespace Apache.Ignite.Core.Impl protected const int True = 1; /** */ + protected const int Error = -1; + + /** */ private const int OpMeta = -1; /** */ @@ -470,7 +473,82 @@ namespace Apache.Ignite.Core.Impl } } } - + + /// + /// Perform out-in operation with a single stream. + /// + /// The type of the r. + /// Operation type. + /// Out action. + /// In action. + /// The action to read an error. + /// + /// Result. + /// + protected TR DoOutInOpX(int type, Action outAction, Func inAction, + Func inErrorAction) + { + Debug.Assert(inErrorAction != null); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + outAction(writer); + + FinishMarshal(writer); + + var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + + if (res != Error && inAction == null) + return default(TR); // quick path for void operations + + stream.SynchronizeInput(); + + stream.Seek(0, SeekOrigin.Begin); + + if (res != Error) + return inAction != null ? inAction(stream, res) : default(TR); + + throw inErrorAction(stream); + } + } + + /// + /// Perform out-in operation with a single stream. + /// + /// Operation type. + /// Out action. + /// The action to read an error. + /// + /// Result. + /// + protected bool DoOutInOpX(int type, Action outAction, + Func inErrorAction) + { + Debug.Assert(inErrorAction != null); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + outAction(writer); + + FinishMarshal(writer); + + var res = UU.TargetInStreamOutLong(_target, type, stream.SynchronizeOutput()); + + if (res != Error) + return res == True; + + stream.SynchronizeInput(); + + stream.Seek(0, SeekOrigin.Begin); + + throw inErrorAction(stream); + } + } + /// /// Perform out-in operation. /// http://git-wip-us.apache.org/repos/asf/ignite/blob/6899e06f/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index 2f70426..c9284d5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; + using System.IO; using System.Runtime.InteropServices; using System.Threading; using Apache.Ignite.Core.Cache.Affinity; @@ -328,6 +329,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged }, true); } + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] private int CacheStoreInvoke(void* target, long objPtr, long memPtr, void* cb) { @@ -342,7 +344,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - return t.Invoke(stream, cb0, _ignite); + try + { + return t.Invoke(stream, cb0, _ignite); + } + catch (Exception e) + { + stream.Seek(0, SeekOrigin.Begin); + + _ignite.Marshaller.StartMarshal(stream).WriteObject(e); + + return -1; + } } }); } @@ -772,8 +785,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged string errCls = reader.ReadString(); string errMsg = reader.ReadString(); string stackTrace = reader.ReadString(); + Exception inner = reader.ReadBoolean() ? reader.ReadObject() : null; - Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader); + Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner); ProcessFuture(futPtr, fut => { fut.OnError(err); }); } @@ -1100,7 +1114,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged // Stream disposal intentionally omitted: IGNITE-1598 var stream = new PlatformRawMemory(errData, errDataLen).GetStream(); - throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, + throw ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, _ignite.Marshaller.StartUnmarshal(stream)); }