From commits-return-5859-archive-asf-public=cust-asf.ponee.io@tez.apache.org Thu Aug 9 23:40:14 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4A95E18065B for ; Thu, 9 Aug 2018 23:40:13 +0200 (CEST) Received: (qmail 67542 invoked by uid 500); 9 Aug 2018 21:40:12 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 67529 invoked by uid 99); 9 Aug 2018 21:40:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Aug 2018 21:40:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20858DFC7C; Thu, 9 Aug 2018 21:40:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gopalv@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3974: Correctness regression of TEZ-955 in TEZ-2937 (Jaume Marhuenda, reviewed by Gopal V) Date: Thu, 9 Aug 2018 21:40:12 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master fe22f3276 -> 90c8195de TEZ-3974: Correctness regression of TEZ-955 in TEZ-2937 (Jaume Marhuenda, reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/90c8195d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/90c8195d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/90c8195d Branch: refs/heads/master Commit: 90c8195de0ddee9da3d4cd07519c8e287c85ad50 Parents: fe22f32 Author: Jaume Marhuenda Authored: Thu Aug 9 14:39:56 2018 -0700 Committer: Gopal V Committed: Thu Aug 9 14:39:56 2018 -0700 ---------------------------------------------------------------------- .../runtime/LogicalIOProcessorRuntimeTask.java | 25 +++- .../TestLogicalIOProcessorRuntimeTask.java | 114 ++++++++++++++++--- 2 files changed, 115 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/90c8195d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 5c2ab77..0ac916f 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -380,30 +380,43 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { "Can only run while in RUNNING state. Current: " + this.state); this.state.set(State.CLOSED); + + List> allCloseInputEvents = Lists.newArrayList(); // Close the Inputs. for (InputSpec inputSpec : inputSpecs) { String srcVertexName = inputSpec.getSourceVertexName(); initializedInputs.remove(srcVertexName); List closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close(); - sendTaskGeneratedEvents(closeInputEvents, - EventProducerConsumerType.INPUT, taskSpec.getVertexName(), - srcVertexName, taskSpec.getTaskAttemptID()); + allCloseInputEvents.add(closeInputEvents); } + List> allCloseOutputEvents = Lists.newArrayList(); // Close the Outputs. for (OutputSpec outputSpec : outputSpecs) { String destVertexName = outputSpec.getDestinationVertexName(); initializedOutputs.remove(destVertexName); List closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close(); - sendTaskGeneratedEvents(closeOutputEvents, - EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), - destVertexName, taskSpec.getTaskAttemptID()); + allCloseOutputEvents.add(closeOutputEvents); } // Close the Processor. processorClosed = true; processor.close(); + for (int i = 0; i < allCloseInputEvents.size(); i++) { + String srcVertexName = inputSpecs.get(i).getSourceVertexName(); + sendTaskGeneratedEvents(allCloseInputEvents.get(i), + EventProducerConsumerType.INPUT, taskSpec.getVertexName(), + srcVertexName, taskSpec.getTaskAttemptID()); + } + + for (int i = 0; i < allCloseOutputEvents.size(); i++) { + String destVertexName = outputSpecs.get(i).getDestinationVertexName(); + sendTaskGeneratedEvents(allCloseOutputEvents.get(i), + EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), + destVertexName, taskSpec.getTaskAttemptID()); + } + } finally { setTaskDone(); // Clear the interrupt status since the task execution is done. http://git-wip-us.apache.org/repos/asf/tez/blob/90c8195d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index c1bb3a1..599f98f 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -51,6 +52,7 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.api.Writer; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -62,6 +64,7 @@ import org.junit.Test; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import org.mockito.Mockito; public class TestLogicalIOProcessorRuntimeTask { @@ -77,10 +80,14 @@ public class TestLogicalIOProcessorRuntimeTask { ScalingAllocator.class.getName()); TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1); - TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30); + TaskSpec task1 = createTaskSpec(taId1, "dag1", + "vertex1", 30, TestProcessor.class.getName(), + TestOutput.class.getName()); TezTaskAttemptID taId2 = createTaskAttemptID(vertexId, 2); - TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1", 10); + TaskSpec task2 = createTaskSpec(taId2, "dag2", + "vertex1", 10, TestProcessor.class.getName(), + TestOutput.class.getName()); TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf); LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, @@ -142,6 +149,50 @@ public class TestLogicalIOProcessorRuntimeTask { } + /** + * We should expect no events being sent to the AM if an + * exception happens in the close method of the processor + */ + @Test + @SuppressWarnings("unchecked") + public void testExceptionHappensInClose() throws Exception { + TezDAGID dagId = createTezDagId(); + TezVertexID vertexId = createTezVertexId(dagId); + Map serviceConsumerMetadata = new HashMap<>(); + Multimap startedInputsMap = HashMultimap.create(); + TezUmbilical umbilical = mock(TezUmbilical.class); + TezConfiguration tezConf = new TezConfiguration(); + tezConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS, + ScalingAllocator.class.getName()); + + TezTaskAttemptID taId1 = createTaskAttemptID(vertexId, 1); + TaskSpec task1 = createTaskSpec(taId1, "dag1", "vertex1", 30, + FaultyTestProcessor.class.getName(), + TestOutputWithEvents.class.getName()); + + TezSharedExecutor sharedExecutor = new TezSharedExecutor(tezConf); + LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, + umbilical, serviceConsumerMetadata, new HashMap(), startedInputsMap, null, + "", new ExecutionContextImpl("localhost"), Runtime.getRuntime().maxMemory(), true, + new DefaultHadoopShim(), sharedExecutor); + + try { + lio1.initialize(); + lio1.run(); + + try { + lio1.close(); + fail("RuntimeException should have been thrown"); + } catch (RuntimeException e) { + // No events should be sent thorught the umbilical protocol + Mockito.verify(umbilical, Mockito.never()).addEvents(Mockito.anyList()); + } + } finally { + sharedExecutor.shutdownNow(); + cleanupAndTest(lio1); + } + } + private void cleanupAndTest(LogicalIOProcessorRuntimeTask lio) throws InterruptedException { ProcessorContext procContext = lio.getProcessorContext(); @@ -175,7 +226,7 @@ public class TestLogicalIOProcessorRuntimeTask { assertEquals(0, lio.outputSpecs.size()); assertTrue(lio.groupInputSpecs == null || lio.groupInputSpecs.size() == 0); } - + assertEquals(0, lio.inputsMap.size()); assertEquals(0, lio.inputContextMap.size()); assertEquals(0, lio.outputsMap.size()); @@ -190,11 +241,12 @@ public class TestLogicalIOProcessorRuntimeTask { } private TaskSpec createTaskSpec(TezTaskAttemptID taskAttemptID, - String dagName, String vertexName, int parallelism) { - ProcessorDescriptor processorDesc = createProcessorDescriptor(); + String dagName, String vertexName, int parallelism, + String processorClassname, String outputClassName) { + ProcessorDescriptor processorDesc = createProcessorDescriptor(processorClassname); TaskSpec taskSpec = new TaskSpec(taskAttemptID, dagName, vertexName, parallelism, processorDesc, - createInputSpecList(), createOutputSpecList(), null, null); + createInputSpecList(), createOutputSpecList(outputClassName), null, null); return taskSpec; } @@ -204,14 +256,14 @@ public class TestLogicalIOProcessorRuntimeTask { return Lists.newArrayList(inputSpec); } - private List createOutputSpecList() { - OutputDescriptor outputtDesc = OutputDescriptor.create(TestOutput.class.getName()); + private List createOutputSpecList(String outputClassName) { + OutputDescriptor outputtDesc = OutputDescriptor.create(outputClassName); OutputSpec outputSpec = new OutputSpec("outedge", outputtDesc, 1); return Lists.newArrayList(outputSpec); } - private ProcessorDescriptor createProcessorDescriptor() { - ProcessorDescriptor desc = ProcessorDescriptor.create(TestProcessor.class.getName()); + private ProcessorDescriptor createProcessorDescriptor(String className) { + ProcessorDescriptor desc = ProcessorDescriptor.create(className); return desc; } @@ -248,15 +300,25 @@ public class TestLogicalIOProcessorRuntimeTask { getContext().notifyProgress(); } - @Override - public void handleEvents(List processorEvents) { - - } + @Override + public void handleEvents(List processorEvents) { + } - @Override - public void close() throws Exception { - - } + @Override + public void close() throws Exception { + } + + } + + public static class FaultyTestProcessor extends TestProcessor { + public FaultyTestProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void close() throws Exception { + throw new RuntimeException(); + } } @@ -336,6 +398,22 @@ public class TestLogicalIOProcessorRuntimeTask { public List close() throws Exception { return null; } + } + + public static class TestOutputWithEvents extends TestOutput { + + public static volatile int startCount = 0; + public static volatile int vertexParallelism; + + public TestOutputWithEvents(OutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + @Override + public List close() throws Exception { + return Arrays.asList( + CompositeDataMovementEvent.create(0, + 0, null)); + } } }