Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A20731793A for ; Wed, 26 Aug 2015 09:47:15 +0000 (UTC) Received: (qmail 46868 invoked by uid 500); 26 Aug 2015 09:47:15 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 46836 invoked by uid 500); 26 Aug 2015 09:47:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 46827 invoked by uid 99); 26 Aug 2015 09:47:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Aug 2015 09:47:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 12A3DDFE04; Wed, 26 Aug 2015 09:47:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Message-Id: <3eca243843524fe18267b6a9e51be484@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Moved platform future utils to Ignite. Date: Wed, 26 Aug 2015 09:47:15 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/master bcf3054b0 -> cdf82e942 Moved platform future utils to Ignite. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cdf82e94 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cdf82e94 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cdf82e94 Branch: refs/heads/master Commit: cdf82e942d0a2577cd077faf5c60dcc3bec4886a Parents: bcf3054 Author: vozerov-gridgain Authored: Wed Aug 26 12:47:55 2015 +0300 Committer: vozerov-gridgain Committed: Wed Aug 26 12:47:55 2015 +0300 ---------------------------------------------------------------------- .../platform/PlatformExtendedException.java | 39 +++ .../platform/utils/PlatformFutureUtils.java | 326 +++++++++++++++++++ .../platform/utils/PlatformUtils.java | 54 +++ 3 files changed, 419 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java new file mode 100644 index 0000000..80b1703 --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformExtendedException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform; + +import org.apache.ignite.internal.portable.*; + +/** + * Denotes an exception which has some data to be written in a special manner. + */ +public interface PlatformExtendedException { + /** + * Gets platform context. + * + * @return Platform context. + */ + public PlatformContext context(); + + /** + * Write data. + * + * @param writer Writer. + */ + public void writeData(PortableRawWriterEx writer); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java new file mode 100644 index 0000000..fa986fe --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.callback.*; +import org.apache.ignite.internal.processors.platform.memory.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +/** + * Interop future utils. + */ +public class PlatformFutureUtils { + /** Future type: byte. */ + public static final int TYP_BYTE = 1; + + /** Future type: boolean. */ + public static final int TYP_BOOL = 2; + + /** Future type: short. */ + public static final int TYP_SHORT = 3; + + /** Future type: char. */ + public static final int TYP_CHAR = 4; + + /** Future type: int. */ + public static final int TYP_INT = 5; + + /** Future type: float. */ + public static final int TYP_FLOAT = 6; + + /** Future type: long. */ + public static final int TYP_LONG = 7; + + /** Future type: double. */ + public static final int TYP_DOUBLE = 8; + + /** Future type: object. */ + public static final int TYP_OBJ = 9; + + /** + * Listen future. + * + * @param ctx Interop context. + * @param fut Java future. + * @param futPtr Native future pointer. + * @param typ Expected return type. + */ + public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ) { + listen(ctx, new FutureListenable(fut), futPtr, typ, null); + } + + /** + * Listen future. + * + * @param ctx Interop context. + * @param fut Java future. + * @param futPtr Native future pointer. + * @param typ Expected return type. + * @param writer Writer. + */ + public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ, + Writer writer) { + listen(ctx, new FutureListenable(fut), futPtr, typ, writer); + } + + /** + * Listen future. + * + * @param ctx Interop context. + * @param fut Java future. + * @param futPtr Native future pointer. + * @param writer Writer. + */ + public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, Writer writer) { + listen(ctx, new FutureListenable(fut), futPtr, TYP_OBJ, writer); + } + + /** + * Listen future. + * + * @param ctx Interop context. + * @param listenable Listenable entry. + * @param futPtr Native future pointer. + * @param typ Expected return type. + * @param writer Optional writer. + */ + @SuppressWarnings("unchecked") + private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ, + @Nullable final Writer writer) { + final PlatformCallbackGateway gate = ctx.gateway(); + + listenable.listen(new IgniteBiInClosure() { + private static final long serialVersionUID = 0L; + + @Override public void apply(Object res, Throwable err) { + if (writer != null && writeToWriter(res, err, ctx, writer, futPtr)) + return; + + if (err != null) { + writeFutureError(ctx, futPtr, err); + + return; + } + + try { + if (typ == TYP_OBJ) { + if (res == null) + gate.futureNullResult(futPtr); + else { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx outWriter = ctx.writer(out); + + outWriter.writeObjectDetached(res); + + out.synchronize(); + + gate.futureObjectResult(futPtr, mem.pointer()); + } + } + } + else if (res == null) + gate.futureNullResult(futPtr); + else { + switch (typ) { + case TYP_BYTE: + gate.futureByteResult(futPtr, (byte) res); + + break; + + case TYP_BOOL: + gate.futureBoolResult(futPtr, (boolean) res ? 1 : 0); + + break; + + case TYP_SHORT: + gate.futureShortResult(futPtr, (short) res); + + break; + + case TYP_CHAR: + gate.futureCharResult(futPtr, (char) res); + + break; + + case TYP_INT: + gate.futureIntResult(futPtr, (int) res); + + break; + + case TYP_FLOAT: + gate.futureFloatResult(futPtr, (float) res); + + break; + + case TYP_LONG: + gate.futureLongResult(futPtr, (long) res); + + break; + + case TYP_DOUBLE: + gate.futureDoubleResult(futPtr, (double) res); + + break; + + default: + assert false : "Should not reach this: " + typ; + } + } + } + catch (Throwable t) { + writeFutureError(ctx, futPtr, t); + + if (t instanceof Error) + throw t; + } + } + }); + } + + /** + * Write future error. + * + * @param ctx Interop context. + * @param futPtr Future pointer. + * @param err Error. + */ + private static void writeFutureError(final PlatformContext ctx, long futPtr, Throwable err) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx outWriter = ctx.writer(out); + + outWriter.writeString(err.getClass().getName()); + outWriter.writeString(err.getMessage()); + + PlatformUtils.writeErrorData(err, outWriter); + + out.synchronize(); + + ctx.gateway().futureError(futPtr, mem.pointer()); + } + } + + /** + * Write result to a custom writer + * + * @param obj Object to write. + * @param err Error to write. + * @param ctx Interop context. + * @param writer Writer. + * @param futPtr Future pointer. + * @return Value indicating whether custom write was performed. When false, default write will be used. + */ + private static boolean writeToWriter(Object obj, Throwable err, PlatformContext ctx, Writer writer, long futPtr) { + boolean canWrite = writer.canWrite(obj, err); + + if (!canWrite) + return false; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + + PortableRawWriterEx outWriter = ctx.writer(out); + + writer.write(outWriter, obj, err); + + out.synchronize(); + + ctx.gateway().futureObjectResult(futPtr, mem.pointer()); + } + + return true; + } + + /** + * Writer allowing special future result handling. + */ + public static interface Writer { + /** + * Write object. + * + * @param writer Writer. + * @param obj Object. + * @param err Error. + */ + public void write(PortableRawWriterEx writer, Object obj, Throwable err); + + /** + * Determines whether this writer can write given data. + * + * @param obj Object. + * @param err Error. + * @return Value indicating whether this writer can write given data. + */ + public boolean canWrite(Object obj, Throwable err); + } + + /** + * Listenable entry. + */ + private static interface Listenable { + /** + * Listen. + * + * @param lsnr Listener. + */ + public void listen(IgniteBiInClosure lsnr); + } + + /** + * Listenable around Ignite future. + */ + private static class FutureListenable implements Listenable { + /** Future. */ + private final IgniteFuture fut; + + /** + * Constructor. + * + * @param fut Future. + */ + public FutureListenable(IgniteFuture fut) { + this.fut = fut; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void listen(final IgniteBiInClosure lsnr) { + fut.listen(new IgniteInClosure() { + private static final long serialVersionUID = 0L; + + @Override public void apply(IgniteFuture fut0) { + try { + lsnr.apply(fut0.get(), null); + } + catch (Throwable err) { + lsnr.apply(null, err); + + if (err instanceof Error) + throw err; + } + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cdf82e94/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index a620f8e..614346a 100644 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -523,6 +523,60 @@ public class PlatformUtils { } /** + * Writer error data. + * + * @param err Error. + * @param writer Writer. + */ + public static void writeErrorData(Throwable err, PortableRawWriterEx writer) { + writeErrorData(err, writer, null); + } + + /** + * Write error data. + * @param err Error. + * @param writer Writer. + * @param log Optional logger. + */ + public static void writeErrorData(Throwable err, PortableRawWriterEx writer, @Nullable IgniteLogger log) { + // Write additional data if needed. + if (err instanceof PlatformExtendedException) { + PlatformExtendedException err0 = (PlatformExtendedException)err; + + writer.writeBoolean(true); // Data exists. + + int pos = writer.out().position(); + + try { + writer.writeBoolean(true); // Optimistically assume that we will be able to write it. + err0.writeData(writer); + } + catch (Exception e) { + if (log != null) + U.warn(log, "Failed to write interop exception data: " + e.getMessage(), e); + + writer.out().position(pos); + + writer.writeBoolean(false); // Error occurred. + writer.writeString(e.getClass().getName()); + + String innerMsg; + + try { + innerMsg = e.getMessage(); + } + catch (Exception innerErr) { + innerMsg = "Exception message is not available."; + } + + writer.writeString(innerMsg); + } + } + else + writer.writeBoolean(false); + } + + /** * Private constructor. */ private PlatformUtils() {