Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F5F117ED3 for ; Wed, 4 Mar 2015 18:17:06 +0000 (UTC) Received: (qmail 14723 invoked by uid 500); 4 Mar 2015 18:17:06 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 14643 invoked by uid 500); 4 Mar 2015 18:17:06 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 14633 invoked by uid 99); 4 Mar 2015 18:17:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 18:17:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 04 Mar 2015 18:16:27 +0000 Received: (qmail 11014 invoked by uid 99); 4 Mar 2015 18:16:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 18:16:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E346E0F58; Wed, 4 Mar 2015 18:16:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 04 Mar 2015 18:16:25 -0000 Message-Id: <7d29d1e770e5428a9318ab3e05096106@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] incubator-ignite git commit: futures: api cleanup X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index eed273d..c09e51b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -420,7 +420,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } // Register per-routine notifications listener if ordered messaging is used. @@ -459,7 +459,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Stop with exception if projection is empty. if (nodes.isEmpty() && !locIncluded) { - return new GridFinishedFuture<>(ctx, + return new GridFinishedFuture<>( new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty).")); } @@ -553,7 +553,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { hnd.onListenerRegistered(routineId, ctx); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, + return new GridFinishedFuture<>( new IgniteCheckedException("Failed to register handler locally: " + hnd, e)); } } @@ -1617,6 +1617,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** */ private static final long serialVersionUID = 0L; + /** */ + private GridKernalContext ctx; + /** Consume ID. */ private UUID routineId; @@ -1641,7 +1644,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param routineId Consume ID. */ StartFuture(GridKernalContext ctx, UUID routineId) { - super(ctx); + this.ctx = ctx; this.routineId = routineId; } @@ -1701,18 +1704,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Timeout object. */ private volatile GridTimeoutObject timeoutObj; - /** - * Required by {@link Externalizable}. - */ - public StopFuture() { - // No-op. - } + /** */ + private GridKernalContext ctx; /** * @param ctx Kernal context. */ StopFuture(GridKernalContext ctx) { - super(ctx); + super(); + this.ctx = ctx; } /** @@ -1762,7 +1762,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param nodeId Master node ID. */ SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { - super(ctx); + super(); this.nodeId = nodeId; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java index 5efcfe9..ae7052a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java @@ -48,7 +48,7 @@ class GridDataLoaderFuture extends GridFutureAdapter { * @param dataLdr Data loader. */ GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) { - super(ctx); + super(); assert dataLdr != null; @@ -57,8 +57,6 @@ class GridDataLoaderFuture extends GridFutureAdapter { /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { - checkValid(); - if (onCancelled()) { dataLdr.closeEx(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index ed3bbcb..1e20486 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -377,7 +377,7 @@ public class IgniteDataLoaderImpl implements IgniteDataLoader, Delay enterBusy(); try { - GridFutureAdapter resFut = new GridFutureAdapter<>(ctx); + GridFutureAdapter resFut = new GridFutureAdapter<>(); resFut.listenAsync(rmvActiveFut); @@ -397,7 +397,7 @@ public class IgniteDataLoaderImpl implements IgniteDataLoader, Delay return new IgniteFutureImpl<>(resFut); } catch (IgniteException e) { - return new IgniteFinishedFutureImpl<>(ctx, e); + return new IgniteFinishedFutureImpl<>(e); } finally { leaveBusy(); @@ -849,7 +849,7 @@ public class IgniteDataLoaderImpl implements IgniteDataLoader, Delay isLocNode = node.equals(ctx.discovery().localNode()); entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); + curFut = new GridFutureAdapter<>(); curFut.listenAsync(signalC); sem = new Semaphore(parallelOps); @@ -878,7 +878,7 @@ public class IgniteDataLoaderImpl implements IgniteDataLoader, Delay entries0 = entries; entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); + curFut = new GridFutureAdapter<>(); curFut.listenAsync(signalC); } } @@ -915,7 +915,7 @@ public class IgniteDataLoaderImpl implements IgniteDataLoader, Delay curFut0 = curFut; entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); + curFut = new GridFutureAdapter<>(); curFut.listenAsync(signalC); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index e61deee..52f762a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -303,7 +303,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc locVal += l; - return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal); + return new GridFinishedFuture<>(updated ? locVal : curVal); } } finally { @@ -347,7 +347,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc locVal += l; - return new GridFinishedFuture<>(ctx.kernalContext(), updated ? locVal : curVal); + return new GridFinishedFuture<>(updated ? locVal : curVal); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java index d0ef4ce..7dc9567 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java @@ -49,7 +49,7 @@ public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available.")); + return new GridFinishedFuture<>(new IgniteCheckedException("Hadoop is not available.")); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index e960422..c5ac658 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -406,7 +406,7 @@ public class IgfsDataManager extends IgfsManager { byte[] res = fut.get(); if (res == null) { - GridFutureAdapter rmtReadFut = new GridFutureAdapter<>(igfsCtx.kernalContext()); + GridFutureAdapter rmtReadFut = new GridFutureAdapter<>(); IgniteInternalFuture oldRmtReadFut = rmtReadFuts.putIfAbsent(key, rmtReadFut); @@ -599,7 +599,7 @@ public class IgfsDataManager extends IgfsManager { if (log.isDebugEnabled()) log.debug("Cannot delete content of not-data file: " + fileInfo); - return new GridFinishedFuture<>(igfsCtx.kernalContext()); + return new GridFinishedFuture<>(); } else return delWorker.deleteAsync(fileInfo); @@ -1256,14 +1256,14 @@ public class IgfsDataManager extends IgfsManager { // Additional size check. if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) - return new GridFinishedFuture(igfsCtx.kernalContext(), + return new GridFinishedFuture( new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " + "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() + ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']')); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(igfsCtx.kernalContext(), new IgniteCheckedException("Failed to store data " + + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " + "block due to unexpected exception.", e)); } } @@ -1674,7 +1674,7 @@ public class IgfsDataManager extends IgfsManager { * Gracefully stops worker by adding STOP_INFO to queue. */ private void stop() { - delReqs.offer(F.t(new GridFutureAdapter<>(igfsCtx.kernalContext()), stopInfo)); + delReqs.offer(F.t(new GridFutureAdapter<>(), stopInfo)); } /** @@ -1682,7 +1682,7 @@ public class IgfsDataManager extends IgfsManager { * @return Future which completes when entry is actually removed. */ private IgniteInternalFuture deleteAsync(IgfsFileInfo info) { - GridFutureAdapter fut = new GridFutureAdapter<>(igfsCtx.kernalContext()); + GridFutureAdapter fut = new GridFutureAdapter<>(); delReqs.offer(F.t(fut, info)); @@ -1795,7 +1795,7 @@ public class IgfsDataManager extends IgfsManager { * @param fileId File id. */ private WriteCompletionFuture(GridKernalContext ctx, IgniteUuid fileId) { - super(ctx); + super(); assert fileId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index eff987e..4fd5f87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -1604,9 +1604,9 @@ public final class IgfsImpl implements IgfsEx { IgniteUuid id = meta.softDelete(null, null, ROOT_ID); if (id == null) - return new GridFinishedFuture(igfsCtx.kernalContext()); + return new GridFinishedFuture(); else { - GridFutureAdapter fut = new GridFutureAdapter<>(igfsCtx.kernalContext()); + GridFutureAdapter fut = new GridFutureAdapter<>(); GridFutureAdapter oldFut = delFuts.putIfAbsent(id, fut); @@ -1625,7 +1625,7 @@ public final class IgfsImpl implements IgfsEx { } } catch (IgniteCheckedException e) { - return new GridFinishedFuture(igfsCtx.kernalContext(), e); + return new GridFinishedFuture(e); } } @@ -1640,7 +1640,7 @@ public final class IgfsImpl implements IgfsEx { GridCompoundFuture resFut = new GridCompoundFuture<>(igfsCtx.kernalContext()); for (IgniteUuid id : ids) { - GridFutureAdapter fut = new GridFutureAdapter<>(igfsCtx.kernalContext()); + GridFutureAdapter fut = new GridFutureAdapter<>(); IgniteInternalFuture oldFut = delFuts.putIfAbsent(id, fut); @@ -1662,7 +1662,7 @@ public final class IgfsImpl implements IgfsEx { return resFut; } else - return new GridFinishedFuture<>(igfsCtx.kernalContext()); + return new GridFinishedFuture<>(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 1d5ba1a..8a8b858 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -118,7 +118,7 @@ class IgfsIpcHandler implements IgfsServerHandler { case LIST_PATHS: { IgfsMessage res = execute(ses, cmd, msg, in); - fut = res == null ? null : new GridFinishedFuture<>(ctx, res); + fut = res == null ? null : new GridFinishedFuture<>(res); break; } @@ -138,7 +138,7 @@ class IgfsIpcHandler implements IgfsServerHandler { return fut; } catch (Exception e) { - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index f503161..92b39f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -193,10 +193,10 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ private IgniteInternalFuture rebuildIndexes(@Nullable final String space, @Nullable final TypeDescriptor desc) { if (idx == null) - return new GridFinishedFuture<>(ctx, new IgniteCheckedException("Indexing is disabled.")); + return new GridFinishedFuture<>(new IgniteCheckedException("Indexing is disabled.")); if (desc == null || !desc.registered()) - return new GridFinishedFuture(ctx); + return new GridFinishedFuture(); final GridWorkerFuture fut = new GridWorkerFuture(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 8c9ef1d..6c1cedb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -90,7 +90,7 @@ public class GridRestProcessor extends GridProcessorAdapter { */ private IgniteInternalFuture handleAsync0(final GridRestRequest req) { if (!busyLock.tryReadLock()) - return new GridFinishedFuture<>(ctx, + return new GridFinishedFuture<>( new IgniteCheckedException("Failed to handle request (received request while stopping grid).")); try { @@ -156,7 +156,7 @@ public class GridRestProcessor extends GridProcessorAdapter { startLatch.await(); } catch (InterruptedException e) { - return new GridFinishedFuture<>(ctx, new IgniteCheckedException("Failed to handle request " + + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to handle request " + "(protocol handler was interrupted when awaiting grid start).", e)); } } @@ -185,10 +185,10 @@ public class GridRestProcessor extends GridProcessorAdapter { U.warn(log, "Cannot update response session token: " + e1.getMessage()); } - return new GridFinishedFuture<>(ctx, res); + return new GridFinishedFuture<>(res); } catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, new GridRestResponse(STATUS_AUTH_FAILED, e.getMessage())); + return new GridFinishedFuture<>(new GridRestResponse(STATUS_AUTH_FAILED, e.getMessage())); } } @@ -199,7 +199,7 @@ public class GridRestProcessor extends GridProcessorAdapter { IgniteInternalFuture res = hnd == null ? null : hnd.handleAsync(req); if (res == null) - return new GridFinishedFuture<>(ctx, + return new GridFinishedFuture<>( new IgniteCheckedException("Failed to find registered handler for command: " + req.command())); final SecurityContext subjCtx0 = subjCtx; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 983dd55..c63e414 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -297,12 +297,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { catch (IgniteException e) { U.error(log, "Failed to execute cache command: " + req, e); - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } catch (IgniteCheckedException e) { U.error(log, "Failed to execute cache command: " + req, e); - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } finally { if (log.isDebugEnabled()) @@ -1043,7 +1043,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { assert metrics != null; - return new GridFinishedFuture(ctx, new GridCacheRestMetrics( + return new GridFinishedFuture(new GridCacheRestMetrics( (int)metrics.getCacheGets(), (int)(metrics.getCacheRemovals() + metrics.getCachePuts()), (int)metrics.getCacheHits(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java index 50d82e6..5ee6418 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheQueryCommandHandler.java @@ -92,7 +92,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter } default: - return new GridFinishedFutureEx<>(new IgniteCheckedException("Unsupported query command: " + req.command())); + return new GridFinishedFuture<>(new IgniteCheckedException("Unsupported query command: " + req.command())); } } @@ -124,7 +124,7 @@ public class GridCacheQueryCommandHandler extends GridRestCommandHandlerAdapter return ctx.closure().callLocalSafe(c, false); else { if (ctx.discovery().node(destId) == null) - return new GridFinishedFutureEx<>(new IgniteCheckedException("Destination node ID has left the grid " + + return new GridFinishedFuture<>(new IgniteCheckedException("Destination node ID has left the grid " + "(retry the query): " + destId)); ctx.task().setThreadContext(TC_NO_FAILOVER, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java index c45f2c7..5abbb77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java @@ -83,13 +83,13 @@ public class DataStructuresCommandHandler extends GridRestCommandHandlerAdapter IgniteCheckedException err = new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("key")); - return new GridFinishedFuture(ctx, err); + return new GridFinishedFuture(err); } else if (req.delta() == null) { IgniteCheckedException err = new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("delta")); - return new GridFinishedFuture(ctx, err); + return new GridFinishedFuture(err); } return ctx.closure().callLocalSafe(new Callable() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java index 78b6bd1..87f0fc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskCommandHandler.java @@ -136,7 +136,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { catch (IgniteCheckedException e) { U.error(log, "Failed to execute task command: " + req, e); - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } finally { if (log.isDebugEnabled()) @@ -159,7 +159,7 @@ public class GridTaskCommandHandler extends GridRestCommandHandlerAdapter { GridRestTaskRequest req0 = (GridRestTaskRequest) req; - final GridFutureAdapter fut = new GridFutureAdapter<>(ctx); + final GridFutureAdapter fut = new GridFutureAdapter<>(); final GridRestResponse res = new GridRestResponse(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java index 7c2a15b..d563f9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java @@ -94,7 +94,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { final String ip = req0.nodeIp(); if (id == null && ip == null) - return new GridFinishedFuture<>(ctx, new IgniteCheckedException( + return new GridFinishedFuture<>(new IgniteCheckedException( "Failed to handle request (either id or ip should be specified).")); ClusterNode node; @@ -131,7 +131,7 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { if (log.isDebugEnabled()) log.debug("Handled topology REST request [res=" + res + ", req=" + req + ']'); - return new GridFinishedFuture<>(ctx, res); + return new GridFinishedFuture<>(res); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java index c66de86..2bfb704 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java @@ -54,6 +54,6 @@ public class GridVersionCommandHandler extends GridRestCommandHandlerAdapter { assert SUPPORTED_COMMANDS.contains(req.command()); - return new GridFinishedFuture<>(ctx, new GridRestResponse(VER_STR)); + return new GridFinishedFuture<>(new GridRestResponse(VER_STR)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java index 43e47cd..aa1abd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java @@ -135,8 +135,8 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter apply(GridRestResponse res, Exception e) { return handleRequest0(ses, req, cmd); } - }, - ctx); + } + ); } if (f != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java index da4733b..f05fdf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceDeploymentFuture.java @@ -37,7 +37,7 @@ public class GridServiceDeploymentFuture extends GridFutureAdapter { * @param cfg Configuration. */ public GridServiceDeploymentFuture(GridKernalContext ctx, ServiceConfiguration cfg) { - super(ctx); + super(); this.cfg = cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 1793beb..aac9d90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -386,7 +386,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { U.error(log, "Failed to deploy service: " + cfg.getName(), e); - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } } } @@ -398,7 +398,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { public IgniteInternalFuture cancel(String name) { while (true) { try { - GridFutureAdapter fut = new GridFutureAdapter<>(ctx); + GridFutureAdapter fut = new GridFutureAdapter<>(); GridFutureAdapter old; @@ -424,7 +424,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { catch (IgniteCheckedException e) { log.error("Failed to undeploy service: " + name, e); - return new GridFinishedFuture<>(ctx, e); + return new GridFinishedFuture<>(e); } } } @@ -446,7 +446,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { futs.add(cancel(dep.configuration().getName())); } - return futs.isEmpty() ? new GridFinishedFuture<>(ctx) : new GridCompoundFuture(ctx, null, futs); + return futs.isEmpty() ? new GridFinishedFuture<>() : new GridCompoundFuture(ctx, null, futs); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java index 31c4c0e..beb2820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerStageExecutionFuture.java @@ -114,8 +114,6 @@ public class GridStreamerStageExecutionFuture extends GridFutureAdapter String stageName, Collection evts ) { - super(streamer.kernalContext()); - assert streamer != null; assert stageName != null; assert evts != null; @@ -226,7 +224,7 @@ public class GridStreamerStageExecutionFuture extends GridFutureAdapter } } catch (IgniteCheckedException e) { - onFailed(ctx.localNodeId(), e); + onFailed(streamer.kernalContext().localNodeId(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 0750259..7139564 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -78,7 +78,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @param ctx Context. */ public GridCompoundFuture(GridKernalContext ctx) { - super(ctx); + super(); } /** @@ -86,7 +86,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @param rdc Reducer. */ public GridCompoundFuture(GridKernalContext ctx, @Nullable IgniteReducer rdc) { - super(ctx); + super(); this.rdc = rdc; } @@ -98,7 +98,7 @@ public class GridCompoundFuture extends GridFutureAdapter { */ public GridCompoundFuture(GridKernalContext ctx, @Nullable IgniteReducer rdc, @Nullable Iterable> futs) { - super(ctx); + super(); this.rdc = rdc; @@ -248,14 +248,14 @@ public class GridCompoundFuture extends GridFutureAdapter { res.compareAndSet(null, rdc.reduce(), false, true); } catch (RuntimeException e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); onDone(e); return; } catch (AssertionError e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); onDone(e); @@ -320,13 +320,13 @@ public class GridCompoundFuture extends GridFutureAdapter { res.compareAndSet(null, rdc.reduce(), false, true); } catch (RuntimeException e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); // Exception in reducer is a bug, so we bypass checkComplete here. onDone(e); } catch (AssertionError e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); // Bypass checkComplete because need to rethrow. onDone(e); @@ -335,36 +335,36 @@ public class GridCompoundFuture extends GridFutureAdapter { } } catch (IgniteTxOptimisticCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Optimistic failure [fut=" + GridCompoundFuture.this + ", err=" + e + ']'); +// if (log.isDebugEnabled()) +// log.debug("Optimistic failure [fut=" + GridCompoundFuture.this + ", err=" + e + ']'); err.compareAndSet(null, e); } catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Topology exception [fut=" + GridCompoundFuture.this + ", err=" + e + ']'); +// if (log.isDebugEnabled()) +// log.debug("Topology exception [fut=" + GridCompoundFuture.this + ", err=" + e + ']'); err.compareAndSet(null, e); } catch (IgniteFutureCancelledCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to execute compound future reducer [lsnr=" + this + ", e=" + e + ']'); +// if (log.isDebugEnabled()) +// log.debug("Failed to execute compound future reducer [lsnr=" + this + ", e=" + e + ']'); err.compareAndSet(null, e); } catch (IgniteCheckedException e) { - if (!ignoreFailure(e)) - U.error(log, "Failed to execute compound future reducer: " + this, e); +// if (!ignoreFailure(e)) +// U.error(log, "Failed to execute compound future reducer: " + this, e); err.compareAndSet(null, e); } catch (RuntimeException e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); err.compareAndSet(null, e); } catch (AssertionError e) { - U.error(log, "Failed to execute compound future reducer: " + this, e); + U.error(null, "Failed to execute compound future reducer: " + this, e); // Bypass checkComplete because need to rethrow. onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java index 41f0ba4..b3ec22d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java @@ -45,13 +45,14 @@ public class GridEmbeddedFuture extends GridFutureAdapter { } /** - * @param ctx Context. * @param embedded Embedded future. * @param c Closure to execute upon completion of embedded future. */ - public GridEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture embedded, final IgniteBiClosure c) { - super(ctx); - + public GridEmbeddedFuture( + IgniteInternalFuture embedded, + final IgniteBiClosure c, + boolean fake + ) { assert embedded != null; assert c != null; @@ -77,30 +78,11 @@ public class GridEmbeddedFuture extends GridFutureAdapter { /** * Embeds futures. Specific change order of arguments to avoid conflicts. - * - * @param syncNotify Synchronous notify flag. - * @param embedded Closure. - * @param c Closure which runs upon completion of embedded closure and which returns another future. - * @param ctx Context. - */ - public GridEmbeddedFuture(boolean syncNotify, IgniteInternalFuture embedded, IgniteBiClosure> c, - GridKernalContext ctx) { - this(embedded, c, ctx); - - syncNotify(syncNotify); - } - - /** - * Embeds futures. Specific change order of arguments to avoid conflicts. - * - * @param ctx Context. - * @param embedded Closure. + * @param embedded Closure. * @param c Closure which runs upon completion of embedded closure and which returns another future. */ - public GridEmbeddedFuture(IgniteInternalFuture embedded, final IgniteBiClosure> c, - GridKernalContext ctx) { - super(ctx); - + public GridEmbeddedFuture( + IgniteInternalFuture embedded, final IgniteBiClosure> c) { assert embedded != null; assert c != null; @@ -158,15 +140,15 @@ public class GridEmbeddedFuture extends GridFutureAdapter { /** * Embeds futures. * - * @param ctx Context. * @param embedded Future. * @param c1 Closure which runs upon completion of embedded future and which returns another future. * @param c2 Closure will runs upon completion of future returned by {@code c1} closure. */ - public GridEmbeddedFuture(GridKernalContext ctx, IgniteInternalFuture embedded, final IgniteBiClosure> c1, final IgniteBiClosure c2) { - super(ctx); - + public GridEmbeddedFuture( + IgniteInternalFuture embedded, final IgniteBiClosure> c1, + final IgniteBiClosure c2 + ) { assert embedded != null; assert c1 != null; assert c2 != null; @@ -268,7 +250,7 @@ public class GridEmbeddedFuture extends GridFutureAdapter { applyx(f); } catch (IgniteIllegalStateException ignore) { - U.warn(log, "Will not execute future listener (grid is stopping): " + ctx.gridName()); + U.warn(null, "Will not execute future listener (grid is stopping): " + this); } catch (Exception e) { onDone(e); @@ -300,7 +282,7 @@ public class GridEmbeddedFuture extends GridFutureAdapter { applyx(f); } catch (IgniteIllegalStateException ignore) { - U.warn(log, "Will not execute future listener (grid is stopping): " + ctx.gridName()); + U.warn(null, "Will not execute future listener (grid is stopping): " + this); } catch (Exception e) { onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index c2f1b89..31f6734 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -21,40 +21,24 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; -import java.io.*; import java.util.concurrent.*; -import static org.apache.ignite.IgniteSystemProperties.*; - /** * Future that is completed at creation time. */ -public class GridFinishedFuture implements IgniteInternalFuture, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Synchronous notification flag. */ - private static final boolean SYNC_NOTIFY = IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true); - +public class GridFinishedFuture implements IgniteInternalFuture { /** Complete value. */ private T t; /** Error. */ private Throwable err; - /** Context. */ - protected GridKernalContext ctx; - /** Start time. */ private final long startTime = U.currentTimeMillis(); - /** Synchronous notification flag. */ - private volatile boolean syncNotify = SYNC_NOTIFY; - /** - * Empty constructor required for {@link Externalizable}. + * Creates finished future with complete value. */ public GridFinishedFuture() { // No-op. @@ -63,43 +47,31 @@ public class GridFinishedFuture implements IgniteInternalFuture, Externali /** * Creates finished future with complete value. * - * @param ctx Context. - */ - public GridFinishedFuture(GridKernalContext ctx) { - assert ctx != null; - - this.ctx = ctx; - - t = null; - err = null; - } - - /** - * Creates finished future with complete value. - * - * @param ctx Context. * @param t Finished value. */ - public GridFinishedFuture(GridKernalContext ctx, T t) { - assert ctx != null; - - this.ctx = ctx; + public GridFinishedFuture(T t) { this.t = t; - - err = null; } /** - * @param ctx Context. * @param err Future error. */ - public GridFinishedFuture(GridKernalContext ctx, Throwable err) { - assert ctx != null; - - this.ctx = ctx; + public GridFinishedFuture(Throwable err) { this.err = err; + } - t = null; + /** + * @return Value of error. + */ + protected Throwable error() { + return err; + } + + /** + * @return Value of result. + */ + protected T result() { + return t; } /** {@inheritDoc} */ @@ -113,26 +85,6 @@ public class GridFinishedFuture implements IgniteInternalFuture, Externali } /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return false; - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - this.syncNotify = syncNotify; - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return syncNotify; - } - - /** {@inheritDoc} */ @Override public boolean cancel() { return false; } @@ -167,52 +119,24 @@ public class GridFinishedFuture implements IgniteInternalFuture, Externali /** {@inheritDoc} */ @Override public void listenAsync(final IgniteInClosure> lsnr) { - if (ctx == null) - throw new IllegalStateException("Cannot attach listener to deserialized future (context is null): " + this); - - if (lsnr != null) { - if (syncNotify) - lsnr.apply(this); - else - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - lsnr.apply(GridFinishedFuture.this); - } - }, true); - } + if (lsnr != null) + lsnr.apply(this); } /** {@inheritDoc} */ @Override public IgniteInternalFuture chain(final IgniteClosure, R> doneCb) { - GridFutureAdapter fut = new GridFutureAdapter(ctx, syncNotify) { + GridFutureAdapter fut = new GridFutureAdapter() { @Override public String toString() { return "ChainFuture[orig=" + GridFinishedFuture.this + ", doneCb=" + doneCb + ']'; } }; - listenAsync(new GridFutureChainListener<>(ctx, fut, doneCb)); + listenAsync(new GridFutureChainListener<>(fut, doneCb)); return fut; } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(t); - out.writeObject(err); - out.writeObject(ctx); - out.writeBoolean(syncNotify); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - t = (T)in.readObject(); - err = (Throwable)in.readObject(); - ctx = (GridKernalContext)in.readObject(); - syncNotify = in.readBoolean(); - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFinishedFuture.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java deleted file mode 100644 index 84323be..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFutureEx.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.future; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.concurrent.*; - -/** - * Future that is completed at creation time. This future is different from - * {@link GridFinishedFuture} as it does not take context as a parameter and - * performs notifications in the same thread. - */ -public class GridFinishedFutureEx implements IgniteInternalFuture, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Complete value. */ - private T t; - - /** Error. */ - private Throwable err; - - /** Start time. */ - private final long startTime = U.currentTimeMillis(); - - /** - * Created finished future with {@code null} value. - */ - public GridFinishedFutureEx() { - this(null, null); - } - - /** - * Creates finished future with complete value. - * - * @param t Finished value. - */ - public GridFinishedFutureEx(T t) { - this(t, null); - } - - /** - * @param err Future error. - */ - public GridFinishedFutureEx(Throwable err) { - this(null, err); - } - - /** - * Creates finished future with complete value and error. - * - * @param t Finished value. - * @param err Future error. - */ - public GridFinishedFutureEx(T t, Throwable err) { - this.err = err; - - this.t = t; - } - - /** {@inheritDoc} */ - @Override public long startTime() { - return startTime; - } - - /** {@inheritDoc} */ - @Override public long duration() { - return 0; - } - - /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return false; - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean cancel() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isCancelled() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDone() { - return true; - } - - /** {@inheritDoc} */ - @Override public T get() throws IgniteCheckedException { - if (err != null) - throw U.cast(err); - - return t; - } - - /** {@inheritDoc} */ - @Override public T get(long timeout) throws IgniteCheckedException { - return get(); - } - - /** {@inheritDoc} */ - @Override public T get(long timeout, TimeUnit unit) throws IgniteCheckedException { - return get(); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture chain(IgniteClosure, R> doneCb) { - try { - return new GridFinishedFutureEx<>(doneCb.apply(this)); - } - catch (GridClosureException e) { - return new GridFinishedFutureEx<>(U.unwrap(e)); - } - catch (RuntimeException | Error e) { - U.warn(null, "Failed to notify chained future [doneCb=" + doneCb + ", err=" + e.getMessage() + ']'); - - throw e; - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void listenAsync(IgniteInClosure> lsnr) { - if (lsnr != null) - lsnr.apply(this); - } - - /** - * @return {@code True} if future failed. - */ - protected boolean failed() { - return err != null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(t); - out.writeObject(err); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - t = (T)in.readObject(); - err = (Throwable)in.readObject(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridFinishedFutureEx.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 4beb89a..b107c15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -24,35 +24,17 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; -import static org.apache.ignite.IgniteSystemProperties.*; - /** * Future adapter. */ -public class GridFutureAdapter extends AbstractQueuedSynchronizer implements IgniteInternalFuture, - Externalizable { +public class GridFutureAdapter extends AbstractQueuedSynchronizer implements IgniteInternalFuture { /** */ private static final long serialVersionUID = 0L; - /** Logger reference. */ - private static final AtomicReference logRef = new AtomicReference<>(); - - /** Logger. */ - protected static IgniteLogger log; - - /** Synchronous notification flag. */ - private static final boolean SYNC_NOTIFY = IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true); - - /** Concurrent notification flag. */ - private static final boolean CONCUR_NOTIFY = - IgniteSystemProperties.getBoolean(IGNITE_FUT_CONCURRENT_NOTIFICATION, false); - /** Initial state. */ private static final int INIT = 0; @@ -75,52 +57,12 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** Future end time. */ private volatile long endTime; - /** Set to {@code false} on deserialization whenever incomplete future is serialized. */ - private boolean valid = true; - /** Asynchronous listeners. */ private Collection>> lsnrs; - /** Context. */ - protected GridKernalContext ctx; - - /** Synchronous notification flag. */ - private volatile boolean syncNotify = SYNC_NOTIFY; - - /** Concurrent notification flag. */ - private volatile boolean concurNotify = CONCUR_NOTIFY; - /** */ private final Object mux = new Object(); - /** - * Empty constructor required for {@link Externalizable}. - */ - public GridFutureAdapter() { - // No-op. - } - - /** - * @param ctx Kernal context. - */ - public GridFutureAdapter(GridKernalContext ctx) { - this(ctx, SYNC_NOTIFY); - } - - /** - * @param syncNotify Synchronous notify flag. - * @param ctx Kernal context. - */ - public GridFutureAdapter(GridKernalContext ctx, boolean syncNotify) { - assert ctx != null; - - this.syncNotify = syncNotify; - - this.ctx = ctx; - - log = U.logger(ctx, logRef, GridFutureAdapter.class); - } - /** {@inheritDoc} */ @Override public long startTime() { return startTime; @@ -140,48 +82,10 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements return endTime; } - /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return concurNotify; - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - this.concurNotify = concurNotify; - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return syncNotify; - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - this.syncNotify = syncNotify; - } - - /** - * Checks that future is in usable state. - */ - protected void checkValid() { - if (!valid) - throw new IllegalStateException("Incomplete future was serialized and cannot " + - "be used after deserialization."); - } - - /** - * @return Valid flag. - */ - protected boolean isValid() { - return valid; - } - /** * @return Value of error. */ protected Throwable error() { - checkValid(); - return err; } @@ -189,15 +93,11 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * @return Value of result. */ protected R result() { - checkValid(); - return res; } /** {@inheritDoc} */ @Override public R get() throws IgniteCheckedException { - checkValid(); - try { if (endTime == 0) acquireSharedInterruptibly(0); @@ -228,8 +128,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout); A.notNull(unit, "unit"); - checkValid(); - try { return get0(unit.toNanos(timeout)); } @@ -263,8 +161,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public void listenAsync(@Nullable final IgniteInClosure> lsnr) { if (lsnr != null) { - checkValid(); - boolean done = isDone(); if (!done) { @@ -281,27 +177,14 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements } if (done) { - try { - if (syncNotify) - notifyListener(lsnr); - else { - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - notifyListener(lsnr); - } - }, true); - } - } - catch (IllegalStateException ignore) { - U.warn(null, "Future notification will not proceed because grid is stopped: " + ctx.gridName()); - } + notifyListener(lsnr); } } } /** {@inheritDoc} */ @Override public IgniteInternalFuture chain(final IgniteClosure, T> doneCb) { - return new ChainFuture<>(ctx, syncNotify, this, doneCb); + return new ChainFuture<>(this, doneCb); } /** @@ -321,30 +204,8 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements assert !lsnrs0.isEmpty(); - if (concurNotify) { - for (final IgniteInClosure> lsnr : lsnrs0) - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - notifyListener(lsnr); - } - }, true); - } - else { - // Always notify in the thread different from start thread. - if (!syncNotify) { - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - // Since concurrent notifications are off, we notify - // all listeners in one thread. - for (IgniteInClosure> lsnr : lsnrs0) - notifyListener(lsnr); - } - }, true); - } - else - for (IgniteInClosure> lsnr : lsnrs0) - notifyListener(lsnr); - } + for (IgniteInClosure> lsnr : lsnrs0) + notifyListener(lsnr); } /** @@ -359,11 +220,11 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements lsnr.apply(this); } catch (IllegalStateException e) { - U.warn(null, "Failed to notify listener (is grid stopped?) [grid=" + ctx.gridName() + + U.warn(null, "Failed to notify listener (is grid stopped?) [fut=" + this + ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']'); } catch (RuntimeException | Error e) { - U.error(log, "Failed to notify listener: " + lsnr, e); + U.error(null, "Failed to notify listener: " + lsnr, e); throw e; } @@ -376,8 +237,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * indeed did happen. */ @Override public boolean cancel() throws IgniteCheckedException { - checkValid(); - return false; } @@ -398,8 +257,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public boolean isCancelled() { - checkValid(); - return getState() == CANCELLED; } @@ -454,8 +311,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements * @return {@code True} if result was set by this call. */ private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { - checkValid(); - boolean notify = false; try { @@ -510,54 +365,6 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - int state0 = getState(); - - out.writeByte(state0); - out.writeBoolean(syncNotify); - out.writeBoolean(concurNotify); - - // Don't write any further if not done, as deserialized future - // will be invalid anyways. - if (state0 != INIT) { - try { - acquireSharedInterruptibly(0); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Thread has been interrupted.", e); - } - - out.writeObject(res); - out.writeObject(err); - out.writeObject(ctx); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int state0 = in.readByte(); - - setState(state0); - - syncNotify = in.readBoolean(); - concurNotify = in.readBoolean(); - - if (state0 == INIT) - valid = false; - else { - res = (R)in.readObject(); - err = (Throwable)in.readObject(); - ctx = (GridKernalContext)in.readObject(); - - // Prevent any thread from being locked on deserialized future. - // This will also set 'endTime'. - releaseShared(0); - } - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFutureAdapter.class, this, "state", state()); } @@ -583,19 +390,19 @@ public class GridFutureAdapter extends AbstractQueuedSynchronizer implements } /** - * @param ctx Context. - * @param syncNotify Sync notify flag. * @param fut Future. * @param doneCb Closure. */ - ChainFuture(GridKernalContext ctx, boolean syncNotify, - GridFutureAdapter fut, IgniteClosure, T> doneCb) { - super(ctx, syncNotify); + ChainFuture( + GridFutureAdapter fut, + IgniteClosure, T> doneCb + ) { + super(); this.fut = fut; this.doneCb = doneCb; - fut.listenAsync(new GridFutureChainListener<>(ctx, this, doneCb)); + fut.listenAsync(new GridFutureChainListener<>(this, doneCb)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java deleted file mode 100644 index ccce6e7..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapterEx.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util.future; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -/** - * Future adapter without kernal context. - */ -public class GridFutureAdapterEx extends AbstractQueuedSynchronizer implements IgniteInternalFuture, - Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Initial state. */ - private static final int INIT = 0; - - /** Cancelled state. */ - private static final int CANCELLED = 1; - - /** Done state. */ - private static final int DONE = 2; - - /** Result. */ - @GridToStringInclude - private R res; - - /** Error. */ - private Throwable err; - - /** Future start time. */ - private final long startTime = U.currentTimeMillis(); - - /** Future end time. */ - private volatile long endTime; - - /** Set to {@code false} on deserialization whenever incomplete future is serialized. */ - private boolean valid = true; - - /** Asynchronous listener. */ - private final ConcurrentLinkedDeque8>> - lsnrs = new ConcurrentLinkedDeque8<>(); - - /** - * Empty constructor required for {@link Externalizable}. - */ - @SuppressWarnings("RedundantNoArgConstructor") - public GridFutureAdapterEx() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long startTime() { - return startTime; - } - - /** {@inheritDoc} */ - @Override public long duration() { - long endTime = this.endTime; - - return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime; - } - - /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return false; - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return true; - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - // No-op - } - - /** - * Checks that future is in usable state. - */ - protected void checkValid() { - if (!valid) - throw new IllegalStateException("Incomplete future was serialized and cannot " + - "be used after deserialization."); - } - - /** - * @return Valid flag. - */ - protected boolean isValid() { - return valid; - } - - /** - * @return Value of error. - */ - protected Throwable error() { - checkValid(); - - return err; - } - - /** - * @return Value of result. - */ - protected R result() { - checkValid(); - - return res; - } - - /** {@inheritDoc} */ - @Override public R get() throws IgniteCheckedException { - checkValid(); - - try { - if (endTime == 0) - acquireSharedInterruptibly(0); - - if (getState() == CANCELLED) - throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - - if (err != null) - throw U.cast(err); - - return res; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public R get(long timeout) throws IgniteCheckedException { - // Do not replace with static import, as it may not compile. - return get(timeout, TimeUnit.MILLISECONDS); - } - - /** {@inheritDoc} */ - @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException { - A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout); - A.notNull(unit, "unit"); - - checkValid(); - - try { - return get0(unit.toNanos(timeout)); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e); - } - } - - /** - * @param nanosTimeout Timeout (nanoseconds). - * @return Result. - * @throws InterruptedException If interrupted. - * @throws org.apache.ignite.internal.IgniteFutureTimeoutCheckedException If timeout reached before computation completed. - * @throws IgniteCheckedException If error occurred. - */ - @Nullable protected R get0(long nanosTimeout) throws InterruptedException, IgniteCheckedException { - if (endTime == 0 && !tryAcquireSharedNanos(0, nanosTimeout)) - throw new IgniteFutureTimeoutCheckedException("Timeout was reached before computation completed."); - - if (getState() == CANCELLED) - throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - - if (err != null) - throw U.cast(err); - - return res; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "TooBroadScope"}) - @Override public void listenAsync(@Nullable final IgniteInClosure> lsnr) { - if (lsnr != null) { - checkValid(); - - boolean done; - - IgniteInClosure> lsnr0 = lsnr; - - done = isDone(); - - if (!done) { - lsnr0 = new IgniteInClosure>() { - private final AtomicBoolean called = new AtomicBoolean(); - - @Override public void apply(IgniteInternalFuture t) { - if (called.compareAndSet(false, true)) - lsnr.apply(t); - } - - @Override public boolean equals(Object o) { - return o != null && (o == this || o == lsnr || o.equals(lsnr)); - } - - @Override public String toString() { - return lsnr.toString(); - } - }; - - lsnrs.add(lsnr0); - - done = isDone(); // Double check. - } - - if (done) - notifyListener(lsnr0); - } - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture chain(final IgniteClosure, T> doneCb) { - final GridFutureAdapterEx fut = new GridFutureAdapterEx() { - @Override public String toString() { - return "ChainFuture[orig=" + GridFutureAdapterEx.this + ", doneCb=" + doneCb + ']'; - } - }; - - listenAsync(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture t) { - try { - fut.onDone(doneCb.apply(t)); - } - catch (GridClosureException e) { - fut.onDone(e.unwrap()); - } - catch (RuntimeException e) { - U.warn(null, "Failed to notify chained future (is grid stopped?) [, doneCb=" + doneCb + - ", err=" + e.getMessage() + ']'); - - fut.onDone(e); - - throw e; - } - catch (Error e) { - U.warn(null, "Failed to notify chained future (is grid stopped?) [doneCb=" + doneCb + - ", err=" + e.getMessage() + ']'); - - fut.onDone(e); - - throw e; - } - } - }); - - return fut; - } - - /** - * Notifies all registered listeners. - */ - private void notifyListeners() { - if (lsnrs.isEmptyx()) - return; - - for (IgniteInClosure> lsnr : lsnrs) - notifyListener(lsnr); - } - - /** - * Notifies single listener. - * - * @param lsnr Listener. - */ - private void notifyListener(IgniteInClosure> lsnr) { - assert lsnr != null; - - try { - lsnr.apply(this); - } - catch (IllegalStateException e) { - U.warn(null, "Failed to notify listener (is grid stopped?) [lsnr=" + lsnr + - ", err=" + e.getMessage() + ']'); - } - catch (RuntimeException | Error e) { - U.error(null, "Failed to notify listener: " + lsnr, e); - - throw e; - } - } - - /** - * Default no-op implementation that always returns {@code false}. - * Futures that do support cancellation should override this method - * and call {@link #onCancelled()} callback explicitly if cancellation - * indeed did happen. - */ - @Override public boolean cancel() throws IgniteCheckedException { - checkValid(); - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isDone() { - // Don't check for "valid" here, as "done" flag can be read - // even in invalid state. - return endTime != 0; - } - - /** {@inheritDoc} */ - @Override public boolean isCancelled() { - checkValid(); - - return getState() == CANCELLED; - } - - /** - * Callback to notify that future is finished with {@code null} result. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @return {@code True} if result was set by this call. - */ - public final boolean onDone() { - return onDone(null, null); - } - - /** - * Callback to notify that future is finished. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @param res Result. - * @return {@code True} if result was set by this call. - */ - public final boolean onDone(@Nullable R res) { - return onDone(res, null); - } - - /** - * Callback to notify that future is finished. - * This method must delegate to {@link #onDone(Object, Throwable)} method. - * - * @param err Error. - * @return {@code True} if result was set by this call. - */ - public final boolean onDone(@Nullable Throwable err) { - return onDone(null, err); - } - - /** - * Callback to notify that future is finished. Note that if non-{@code null} exception is passed in - * the result value will be ignored. - * - * @param res Optional result. - * @param err Optional error. - * @return {@code True} if result was set by this call. - */ - public boolean onDone(@Nullable R res, @Nullable Throwable err) { - return onDone(res, err, false); - } - - /** - * @param res Result. - * @param err Error. - * @param cancel {@code True} if future is being cancelled. - * @return {@code True} if result was set by this call. - */ - private boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) { - checkValid(); - - boolean notify = false; - - try { - if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) { - this.res = res; - this.err = err; - - notify = true; - - releaseShared(0); - - return true; - } - - return false; - } - finally { - if (notify) - notifyListeners(); - } - } - - /** - * Callback to notify that future is cancelled. - * - * @return {@code True} if cancel flag was set by this call. - */ - public boolean onCancelled() { - return onDone(null, null, true); - } - - /** {@inheritDoc} */ - @Override protected final int tryAcquireShared(int ignore) { - return endTime != 0 ? 1 : -1; - } - - /** {@inheritDoc} */ - @Override protected final boolean tryReleaseShared(int ignore) { - endTime = U.currentTimeMillis(); - - // Always signal after setting final done status. - return true; - } - - /** - * @return String representation of state. - */ - private String state() { - int s = getState(); - - return s == INIT ? "INIT" : s == CANCELLED ? "CANCELLED" : "DONE"; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - int state0 = getState(); - - out.writeByte(state0); - - // Don't write any further if not done, as deserialized future - // will be invalid anyways. - if (state0 != INIT) { - try { - acquireSharedInterruptibly(0); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IOException("Thread has been interrupted.", e); - } - - out.writeObject(res); - out.writeObject(err); - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int state0 = in.readByte(); - - setState(state0); - - if (state0 == INIT) - valid = false; - else { - res = (R)in.readObject(); - err = (Throwable)in.readObject(); - - // Prevent any thread from being locked on deserialized future. - // This will also set 'endTime'. - releaseShared(0); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridFutureAdapterEx.class, this, "state", state()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1b46951/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java index 0599053..d98538e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java @@ -29,9 +29,6 @@ public class GridFutureChainListener implements IgniteInClosure fut; @@ -40,14 +37,13 @@ public class GridFutureChainListener implements IgniteInClosure fut, - IgniteClosure, R> doneCb) { - this.ctx = ctx; + public GridFutureChainListener( + GridFutureAdapter fut, + IgniteClosure, R> doneCb + ) { this.fut = fut; this.doneCb = doneCb; } @@ -61,8 +57,8 @@ public class GridFutureChainListener implements IgniteInClosure