Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B49910ACB for ; Sun, 6 Oct 2013 17:40:17 +0000 (UTC) Received: (qmail 94741 invoked by uid 500); 6 Oct 2013 17:40:16 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 94700 invoked by uid 500); 6 Oct 2013 17:40:13 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 94597 invoked by uid 99); 6 Oct 2013 17:40:12 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 06 Oct 2013 17:40:12 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id ECDB6910D04; Sun, 6 Oct 2013 17:40:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <64da85ce8c584087b918e8a03d28a9fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-273: Make PipelineExecution implement ListenableFuture Date: Sun, 6 Oct 2013 17:40:11 +0000 (UTC) Updated Branches: refs/heads/master ecf5dd01c -> 7a8af2865 CRUNCH-273: Make PipelineExecution implement ListenableFuture Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7a8af286 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7a8af286 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7a8af286 Branch: refs/heads/master Commit: 7a8af286529aeacb07bd7bc6daf313d5e3f31b49 Parents: ecf5dd0 Author: Josh Wills Authored: Thu Oct 3 00:31:58 2013 -0700 Committer: Josh Wills Committed: Sun Oct 6 10:37:24 2013 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/CancelJobsIT.java | 36 +++++++- .../org/apache/crunch/PipelineExecution.java | 4 +- .../org/apache/crunch/impl/mem/MemPipeline.java | 97 +++++++++++++------- .../apache/crunch/impl/mr/exec/MRExecutor.java | 42 ++++++++- 4 files changed, 140 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java index ff01a2f..5fbca57 100644 --- a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java @@ -18,6 +18,7 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -45,7 +46,15 @@ public class CancelJobsIT { assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); assertEquals(2, pr.getStageResults().size()); } - + + @Test + public void testGet() throws Exception { + PipelineExecution pe = run(); + PipelineResult pr = pe.get(); + assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); + assertEquals(2, pr.getStageResults().size()); + } + @Test public void testKill() throws Exception { PipelineExecution pe = run(); @@ -55,6 +64,31 @@ public class CancelJobsIT { } @Test + public void testKillGet() throws Exception { + PipelineExecution pe = run(); + pe.kill(); + PipelineResult res = pe.get(); + assertFalse(res.succeeded()); + assertEquals(PipelineExecution.Status.KILLED, pe.getStatus()); + } + + @Test + public void testCancelNoInterrupt() throws Exception { + PipelineExecution pe = run(); + pe.cancel(false); + pe.waitUntilDone(); + assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus()); + } + + @Test + public void testCancelMayInterrupt() throws Exception { + PipelineExecution pe = run(); + pe.cancel(true); + pe.waitUntilDone(); + assertEquals(PipelineExecution.Status.KILLED, pe.getStatus()); + } + + @Test public void testKillMultipleTimes() throws Exception { PipelineExecution pe = run(); for (int i = 0; i < 10; i++) { http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java index fc6bb91..af6a177 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java @@ -17,6 +17,8 @@ */ package org.apache.crunch; +import com.google.common.util.concurrent.ListenableFuture; + import java.util.concurrent.TimeUnit; /** @@ -24,7 +26,7 @@ import java.util.concurrent.TimeUnit; * * This interface is thread-safe. */ -public interface PipelineExecution { +public interface PipelineExecution extends ListenableFuture { enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED } http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 60677fc..cc9ad69 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -20,8 +20,11 @@ package org.apache.crunch.impl.mem; import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import com.google.common.util.concurrent.AbstractFuture; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericContainer; @@ -306,44 +309,16 @@ public class MemPipeline implements Pipeline { @Override public PipelineExecution runAsync() { activeTargets.clear(); - return new PipelineExecution() { - @Override - public String getPlanDotFile() { - return ""; - } - - @Override - public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { - // no-po - } - - @Override - public void waitUntilDone() throws InterruptedException { - // no-po - } - - @Override - public Status getStatus() { - return Status.SUCCEEDED; - } - - @Override - public PipelineResult getResult() { - return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)), - Status.SUCCEEDED); - } - - @Override - public void kill() { - } - }; + return new MemExecution(); } @Override public PipelineResult run() { - activeTargets.clear(); - return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)), - PipelineExecution.Status.SUCCEEDED); + try { + return runAsync().get(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override @@ -360,4 +335,58 @@ public class MemPipeline implements Pipeline { public String getName() { return "Memory Pipeline"; } + + private static class MemExecution extends AbstractFuture implements PipelineExecution { + + private PipelineResult res; + + public MemExecution() { + this.res = new PipelineResult( + ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)), + PipelineExecution.Status.SUCCEEDED); + } + + @Override + public String getPlanDotFile() { + return ""; + } + + @Override + public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { + set(res); + } + + @Override + public void waitUntilDone() throws InterruptedException { + set(res); + } + + @Override + public PipelineResult get() throws ExecutionException, InterruptedException { + set(res); + return super.get(); + } + + @Override + public PipelineResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, + TimeoutException { + set(res); + return super.get(timeout, timeUnit); + } + + @Override + public Status getStatus() { + return isDone() ? Status.SUCCEEDED : Status.READY; + } + + @Override + public PipelineResult getResult() { + return isDone() ? res : null; + } + + @Override + public void kill() { + // No-op + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 532e37c..a655b23 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -19,6 +19,7 @@ package org.apache.crunch.impl.mr.exec; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractFuture; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.PipelineResult; @@ -37,7 +38,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; /** @@ -49,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference; * * It is thread-safe. */ -public class MRExecutor implements MRPipelineExecution { +public class MRExecutor extends AbstractFuture implements MRPipelineExecution { private static final Log LOG = LogFactory.getLog(MRExecutor.class); @@ -62,6 +65,7 @@ public class MRExecutor implements MRPipelineExecution { private AtomicReference status = new AtomicReference(Status.READY); private PipelineResult result; private Thread monitorThread; + private boolean started; private String planDotFile; @@ -89,13 +93,17 @@ public class MRExecutor implements MRPipelineExecution { this.planDotFile = planDotFile; } - public MRPipelineExecution execute() { - monitorThread.start(); + public synchronized MRPipelineExecution execute() { + if (!started) { + monitorThread.start(); + started = true; + } return this; } /** Monitors running status. It is called in {@code MonitorThread}. */ private void monitorLoop() { + status.set(Status.RUNNING); try { while (killSignal.getCount() > 0 && !control.allFinished()) { control.pollJobStatusAndStartNewOnes(); @@ -150,12 +158,14 @@ public class MRExecutor implements MRPipelineExecution { status.set(Status.SUCCEEDED); } result = new PipelineResult(stages, status.get()); + set(result); } } catch (InterruptedException e) { throw new AssertionError(e); // Nobody should interrupt us. } catch (IOException e) { LOG.error("Pipeline failed due to exception", e); status.set(Status.FAILED); + setException(e); } finally { doneSignal.countDown(); } @@ -177,6 +187,23 @@ public class MRExecutor implements MRPipelineExecution { } @Override + public PipelineResult get() throws InterruptedException, ExecutionException { + if (getStatus() == Status.READY) { + execute(); + } + return super.get(); + } + + @Override + public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, + ExecutionException { + if (getStatus() == Status.READY) { + execute(); + } + return super.get(timeout, unit); + } + + @Override public synchronized Status getStatus() { return status.get(); } @@ -191,6 +218,15 @@ public class MRExecutor implements MRPipelineExecution { killSignal.countDown(); } + @Override + protected void interruptTask() { + try { + kill(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + private static boolean isLocalMode() { Configuration conf = new Configuration(); // Try to handle MapReduce version 0.20 or 0.22