Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2168317435 for ; Tue, 21 Apr 2015 11:23:09 +0000 (UTC) Received: (qmail 10047 invoked by uid 500); 21 Apr 2015 11:23:09 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 9977 invoked by uid 500); 21 Apr 2015 11:23:09 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 9948 invoked by uid 99); 21 Apr 2015 11:23:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 11:23:09 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 11:23:01 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 2DDAE43EB6 for ; Tue, 21 Apr 2015 11:22:40 +0000 (UTC) Received: (qmail 9182 invoked by uid 99); 21 Apr 2015 11:22:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 11:22:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8DFCDE0E1F; Tue, 21 Apr 2015 11:22:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 21 Apr 2015 11:22:45 -0000 Message-Id: <5314bcec855c45a28bce0dc38e8fa95e@git.apache.org> In-Reply-To: <76c95d2701a841429b46bba3602788b0@git.apache.org> References: <76c95d2701a841429b46bba3602788b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/12] incubator-ignite git commit: backport from 6.6.5 of gg-10102 X-Virus-Checked: Checked by ClamAV on apache.org backport from 6.6.5 of gg-10102 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/43659dcf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/43659dcf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/43659dcf Branch: refs/heads/ignite-446 Commit: 43659dcf670c66fe5ccb0d306a95e689b200ec47 Parents: bc025d9 Author: Denis Magda Authored: Tue Apr 21 12:57:10 2015 +0300 Committer: Denis Magda Committed: Tue Apr 21 12:57:10 2015 +0300 ---------------------------------------------------------------------- .../examples/ScalarContinuationExample.scala | 10 +- .../ignite/compute/ComputeJobContinuation.java | 2 + .../ignite/internal/GridJobContextImpl.java | 100 +++++++++++----- .../processors/job/GridJobHoldListener.java | 6 +- .../processors/job/GridJobProcessor.java | 22 ++-- .../internal/processors/job/GridJobWorker.java | 23 ++-- .../internal/GridContinuousTaskSelfTest.java | 114 +++++++++++++++++++ 7 files changed, 222 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala index 3ef0527..203f0b7 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala @@ -154,14 +154,16 @@ class FibonacciClosure ( jobCtx.callcc() // Resume job execution. } + // Hold (suspend) job execution. + // It will be resumed in listener above via 'callcc()' call + // once both futures are done. + jobCtx.holdcc() + // Attach the same listener to both futures. fut1.listen(lsnr) fut2.listen(lsnr) - // Hold (suspend) job execution. - // It will be resumed in listener above via 'callcc()' call - // once both futures are done. - return jobCtx.holdcc() + return null } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java index 2014f10..cd91e2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java @@ -49,6 +49,7 @@ public interface ComputeJobContinuation { * language. Basically, the job is held to be continued later, hence the name of the method. * * @return Always returns {@code null} for convenience to be used in code with return statement. + * @throws IllegalStateException If job has been already held before. */ @Nullable public T holdcc(); @@ -71,6 +72,7 @@ public interface ComputeJobContinuation { * * @param timeout Timeout in milliseconds after which job will be automatically resumed. * @return Always returns {@code null} for convenience to be used in code with return statement. + * @throws IllegalStateException If job has been already held before */ @Nullable public T holdcc(long timeout); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java index 81f75a4..a1e0135 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java @@ -53,6 +53,9 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable { /** Job worker. */ private GridJobWorker job; + /** Current timeout object. */ + private volatile GridTimeoutObject timeoutObj; + /** Attributes mux. Do not use this as object is exposed to user. */ private final Object mux = new Object(); @@ -167,46 +170,62 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable { job = ctx.job().activeJob(jobId); // Completed? - if (job != null) { - job.hold(); + if (job != null && !job.isDone()) { + if (!job.hold()) + throw new IllegalStateException("Job has already been held [ctx=" + this + ']'); - if (timeout > 0 && !job.isDone()) { - final long endTime = U.currentTimeMillis() + timeout; + assert timeoutObj == null; - // Overflow. - if (endTime > 0) { - ctx.timeout().addTimeoutObject(new GridTimeoutObject() { - private final IgniteUuid id = IgniteUuid.randomUuid(); + if (timeout <= 0) + return null; - @Override public IgniteUuid timeoutId() { - return id; - } + final long endTime = U.currentTimeMillis() + timeout; - @Override public long endTime() { - return endTime; - } + // Overflow. + if (endTime > 0) { + timeoutObj = new GridTimeoutObject() { + private final IgniteUuid id = IgniteUuid.randomUuid();; - @Override public void onTimeout() { - try { - ExecutorService execSvc = job.isInternal() ? - ctx.getManagementExecutorService() : ctx.getExecutorService(); + @Override public IgniteUuid timeoutId() { + return id; + } - assert execSvc != null; + @Override public long endTime() { + return endTime; + } - execSvc.submit(new Runnable() { - @Override public void run() { - callcc(); - } - }); - } - catch (RejectedExecutionException e) { - U.error(log(), "Failed to execute job (will execute synchronously).", e); + @Override public void onTimeout() { + try { + synchronized (mux) { + GridTimeoutObject timeoutObj0 = timeoutObj; + + if (timeoutObj0 == null || timeoutObj0.timeoutId() != id) + // The timer was canceled by explicit callcc() call. + return; - callcc(); + timeoutObj = null; } + + ExecutorService execSvc = job.isInternal() ? + ctx.getManagementExecutorService() : ctx.getExecutorService(); + + assert execSvc != null; + + execSvc.submit(new Runnable() { + @Override public void run() { + callcc0(); + } + }); } - }); - } + catch (RejectedExecutionException e) { + U.error(log(), "Failed to execute job (will execute synchronously).", e); + + callcc0(); + } + } + }; + + ctx.timeout().addTimeoutObject(timeoutObj); } } } @@ -216,13 +235,32 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable { /** {@inheritDoc} */ @Override public void callcc() { + synchronized (mux) { + GridTimeoutObject timeoutObj0 = timeoutObj; + + if (timeoutObj0 != null) { + if (ctx != null) + ctx.timeout().removeTimeoutObject(timeoutObj0); + + timeoutObj = null; + } + } + + callcc0(); + } + + /** + * Unholds job. + */ + private void callcc0() { if (ctx != null) { if (job == null) job = ctx.job().activeJob(jobId); - if (job != null) + if (job != null) { // Execute in the same thread. job.execute(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java index 04d9042..3e1dfee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java @@ -26,11 +26,13 @@ import java.util.*; interface GridJobHoldListener extends EventListener { /** * @param worker Held job worker. + * @return {@code True} if worker has been held. */ - public void onHold(GridJobWorker worker); + public boolean onHeld(GridJobWorker worker); /** * @param worker Unheld job worker. + * @return {@code True} if worker has been unheld. */ - public void onUnhold(GridJobWorker worker); + public boolean onUnheld(GridJobWorker worker); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 3344028..a13e170 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1635,25 +1635,33 @@ public class GridJobProcessor extends GridProcessorAdapter { */ private class JobHoldListener implements GridJobHoldListener { /** {@inheritDoc} */ - @Override public void onHold(GridJobWorker worker) { + @Override public boolean onHeld(GridJobWorker worker) { if (log.isDebugEnabled()) - log.debug("Received onHold() callback [worker=" + worker + ']'); + log.debug("Received onHeld() callback [worker=" + worker + ']'); + + boolean res = false; if (activeJobs.containsKey(worker.getJobId())) { - heldJobs.add(worker.getJobId()); + res = heldJobs.add(worker.getJobId()); - if (!activeJobs.containsKey(worker.getJobId())) + if (!activeJobs.containsKey(worker.getJobId())) { heldJobs.remove(worker.getJobId()); + + // Job has been completed and therefore cannot be held. + res = false; + } } + + return res; } /** {@inheritDoc} */ - @Override public void onUnhold(GridJobWorker worker) { + @Override public boolean onUnheld(GridJobWorker worker) { if (log.isDebugEnabled()) - log.debug("Received onUnhold() callback [worker=" + worker + ", active=" + activeJobs + + log.debug("Received onUnheld() callback [worker=" + worker + ", active=" + activeJobs + ", held=" + heldJobs + ']'); - heldJobs.remove(worker.getJobId()); + return heldJobs.remove(worker.getJobId()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index fdfc5c8..b691180 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -354,12 +354,15 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { /** * Sets halt flags. */ - public void hold() { - held.incrementAndGet(); - + public boolean hold() { HOLD.set(true); - holdLsnr.onHold(this); + boolean res; + + if (res = holdLsnr.onHeld(this)) + held.incrementAndGet(); + + return res; } /** @@ -431,7 +434,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { } /** - * @param skipNtf {@code True} to skip job processor {@code onUnhold()} + * @param skipNtf {@code True} to skip job processor {@code onUnheld()} * notification (only from {@link #body()}). */ private void execute0(boolean skipNtf) { @@ -443,13 +446,11 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { super.cancel(); if (!skipNtf) { - holdLsnr.onUnhold(this); - - int c = held.decrementAndGet(); - - if (c > 0) { + if (holdLsnr.onUnheld(this)) + held.decrementAndGet(); + else { if (log.isDebugEnabled()) - log.debug("Ignoring job execution (job was held several times) [c=" + c + ']'); + log.debug("Ignoring job execution (job was not held)."); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java index 7c6bacc..d22dcec 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.resources.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; @@ -134,6 +135,119 @@ public class GridContinuousTaskSelfTest extends GridCommonAbstractTest { } } + /** + * @throws Exception If test failed. + */ + public void testClearTimeouts() throws Exception { + int holdccTimeout = 4000; + + try { + Ignite grid = startGrid(0); + + TestClearTimeoutsClosure closure = new TestClearTimeoutsClosure(); + + grid.compute().apply(closure, holdccTimeout); + + Thread.sleep(holdccTimeout * 2); + + assert closure.counter == 2; + } + finally { + stopGrid(0); + } + } + + /** + * @throws Exception If test failed. + */ + public void testMultipleHoldccCalls() throws Exception { + try { + Ignite grid = startGrid(0); + + assertTrue(grid.compute().apply(new TestMultipleHoldccCallsClosure(), (Object)null)); + } + finally { + stopGrid(0); + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestMultipleHoldccCallsClosure implements IgniteClosure { + /** */ + private int counter; + + /** */ + private volatile boolean success; + + /** Auto-inject job context. */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** */ + @LoggerResource + private IgniteLogger log; + + @Override public Boolean apply(Object param) { + counter++; + + if (counter == 2) + return success; + + jobCtx.holdcc(4000); + + try { + jobCtx.holdcc(); + } + catch (IllegalStateException e) { + success = true; + log.info("Second holdcc() threw IllegalStateException as expected."); + } + finally { + new Timer().schedule(new TimerTask() { + @Override public void run() { + jobCtx.callcc(); + } + }, 1000); + } + + return false; + } + } + + /** */ + @SuppressWarnings({"PublicInnerClass"}) + public static class TestClearTimeoutsClosure implements IgniteClosure { + /** */ + private int counter; + + /** Auto-inject job context. */ + @JobContextResource + private ComputeJobContext jobCtx; + + @Override public Object apply(Integer holdccTimeout) { + assert holdccTimeout >= 2000; + + counter++; + + if (counter == 1) { + new Timer().schedule(new TimerTask() { + @Override public void run() { + jobCtx.callcc(); + } + }, 1000); + + jobCtx.holdcc(holdccTimeout); + } + + if (counter == 2) + // Job returned from the suspended state. + return null; + + return null; + } + } + /** */ @SuppressWarnings({"PublicInnerClass"}) public static class TestJobsChainTask implements ComputeTask {