Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2AB6E17C80 for ; Tue, 21 Oct 2014 21:33:26 +0000 (UTC) Received: (qmail 63305 invoked by uid 500); 21 Oct 2014 21:33:26 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 63270 invoked by uid 500); 21 Oct 2014 21:33:26 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 63261 invoked by uid 99); 21 Oct 2014 21:33:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 21:33:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BEFE793BFD3; Tue, 21 Oct 2014 21:33:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1667. Add a system test for InitializerEvents. (sseth) (cherry picked from commit 99f2b8ee94cd9ed551fa380ed0411b4cc69f6d77) Date: Tue, 21 Oct 2014 21:33:25 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.5 2edbebaac -> 8a84fd97b TEZ-1667. Add a system test for InitializerEvents. (sseth) (cherry picked from commit 99f2b8ee94cd9ed551fa380ed0411b4cc69f6d77) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8a84fd97 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8a84fd97 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8a84fd97 Branch: refs/heads/branch-0.5 Commit: 8a84fd97b7ec71d4ac6fdeed2ae9334c0425ac0c Parents: 2edbeba Author: Siddharth Seth Authored: Tue Oct 21 14:32:44 2014 -0700 Committer: Siddharth Seth Committed: Tue Oct 21 14:33:15 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../api/events/InputInitializerEvent.java | 6 + .../java/org/apache/tez/test/TestTezJobs.java | 138 +++++++++++++++++++ 3 files changed, 145 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8a84fd97/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 44ed877..753cb3e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -38,6 +38,7 @@ ALL CHANGES: TEZ-1584. Restore counters from DAGFinishedEvent when DAG is completed. TEZ-1525. BroadcastLoadGen testcase. TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes + TEZ-1667. Add a system test for InitializerEvents. Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/8a84fd97/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java index 8360447..3037e61 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java @@ -111,4 +111,10 @@ public class InputInitializerEvent extends Event { public String getSourceVertexName() { return this.sourceVertexName; } + + @Override + public String toString() { + return "[TargetVertexName=" + targetVertexName + ", TargetInputName=" + targetInputName + + ", SourceVertexName=" + sourceVertexName + " Payload=" + eventPayload + "]"; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/8a84fd97/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 484ca7e..80110b8 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -28,10 +28,17 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -46,15 +53,31 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.examples.OrderedWordCount; import org.apache.tez.examples.SimpleSessionExample; import org.apache.tez.examples.JoinDataGen; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinValidate; import org.apache.tez.examples.SortMergeJoinExample; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.library.processor.SimpleProcessor; +import org.apache.tez.runtime.library.processor.SleepProcessor; +import org.apache.tez.test.dag.MultiAttemptDAG; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -536,4 +559,119 @@ public class TestTezJobs { } } } + + @Test(timeout = 60000) + public void testInputInitializerEvents() throws TezException, InterruptedException, IOException { + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + TezClient tezClient = TezClient.create("TestInputInitializerEvents", tezConf); + tezClient.start(); + + try { + DAG dag = DAG.create("TestInputInitializerEvents"); + Vertex vertex1 = Vertex.create(VERTEX_WITH_INITIALIZER_NAME, ProcessorDescriptor.create( + SleepProcessor.class.getName()) + .setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1) + .addDataSource(INPUT1_NAME, + DataSourceDescriptor + .create(InputDescriptor.create(MultiAttemptDAG.NoOpInput.class.getName()), + InputInitializerDescriptor.create(InputInitializerForTest.class.getName()), + null)); + Vertex vertex2 = Vertex.create(EVENT_GENERATING_VERTEX_NAME, + ProcessorDescriptor.create(InputInitializerEventGeneratingProcessor.class.getName()), 5); + + dag.addVertex(vertex1).addVertex(vertex2); + + DAGClient dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + } finally { + tezClient.stop(); + } + } + + private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer"; + private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex"; + private static final String INPUT1_NAME = "Input1"; + + public static class InputInitializerEventGeneratingProcessor extends SimpleProcessor { + + public InputInitializerEventGeneratingProcessor( + ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + if (getContext().getTaskIndex() == 1 && getContext().getTaskAttemptNumber() == 0) { + throw new IOException("Failing task 2, attempt 0"); + } + + InputInitializerEvent initializerEvent = InputInitializerEvent.create( + VERTEX_WITH_INITIALIZER_NAME, INPUT1_NAME, + ByteBuffer.allocate(4).putInt(0, getContext().getTaskIndex())); + List events = Lists.newArrayList(); + events.add(initializerEvent); + getContext().sendEvents(events); + } + } + + public static class InputInitializerForTest extends InputInitializer { + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final BitSet eventsSeen = new BitSet(); + + public InputInitializerForTest( + InputInitializerContext initializerContext) { + super(initializerContext); + } + + @Override + public List initialize() throws Exception { + getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of( + VertexState.SUCCEEDED)); + lock.lock(); + try { + condition.await(); + } finally { + lock.unlock(); + } + return null; + } + + + @Override + public void handleInputInitializerEvent(List events) throws Exception { + lock.lock(); + try { + for (InputInitializerEvent event : events) { + Preconditions.checkArgument( + event.getSourceVertexName().equals(EVENT_GENERATING_VERTEX_NAME)); + int index = event.getUserPayload().getInt(0); + Preconditions.checkState(!eventsSeen.get(index)); + eventsSeen.set(index); + } + } finally { + lock.unlock(); + } + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + lock.lock(); + try { + Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.SUCCEEDED); + if (eventsSeen.cardinality() == + getContext().getVertexNumTasks(EVENT_GENERATING_VERTEX_NAME)) { + condition.signal(); + } else { + throw new IllegalStateException( + "Received VertexState SUCCEEDED before receiving all InputInitializerEvents"); + } + } finally { + lock.unlock(); + } + } + } }