Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B1AE200D47 for ; Sat, 11 Nov 2017 01:47:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 29878160BF2; Sat, 11 Nov 2017 00:47:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EF0EB160BEE for ; Sat, 11 Nov 2017 01:47:12 +0100 (CET) Received: (qmail 85877 invoked by uid 500); 11 Nov 2017 00:47:12 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 85868 invoked by uid 99); 11 Nov 2017 00:47:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 11 Nov 2017 00:47:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F21A5DFAB0; Sat, 11 Nov 2017 00:47:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhiyuany@apache.org To: commits@tez.apache.org Message-Id: <6a7acc1270454128bc977c900a49cd2d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-3855. Allow vertex manager to send event to processor (zhiyuany) Date: Sat, 11 Nov 2017 00:47:10 +0000 (UTC) archived-at: Sat, 11 Nov 2017 00:47:14 -0000 Repository: tez Updated Branches: refs/heads/master a24652735 -> b96f79fa7 TEZ-3855. Allow vertex manager to send event to processor (zhiyuany) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b96f79fa Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b96f79fa Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b96f79fa Branch: refs/heads/master Commit: b96f79fa75dc6cf47e4d648b028ccb12f02308a6 Parents: a246527 Author: Zhiyuan Yang Authored: Fri Nov 10 16:44:29 2017 -0800 Committer: Zhiyuan Yang Committed: Fri Nov 10 16:44:29 2017 -0800 ---------------------------------------------------------------------- .../tez/dag/api/VertexManagerPluginContext.java | 13 ++++ .../api/events/CustomProcessorEvent.java | 65 ++++++++++++++++++++ tez-api/src/main/proto/Events.proto | 5 ++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++++ .../tez/dag/app/dag/impl/VertexManager.java | 26 ++++++++ .../tez/dag/app/dag/impl/TestVertexManager.java | 57 +++++++++++++++-- .../org/apache/tez/common/ProtoConverters.java | 18 ++++++ .../apache/tez/runtime/api/impl/EventType.java | 1 + .../apache/tez/runtime/api/impl/TezEvent.java | 15 +++++ 9 files changed, 208 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index b858a65..b89b279 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -29,8 +29,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.InputSpecUpdate; import org.apache.tez.runtime.api.VertexStatistics; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import com.google.common.base.Preconditions; @@ -268,6 +270,17 @@ public interface VertexManagerPluginContext { * task to which events need to be sent. */ public void addRootInputEvents(String inputName, Collection events); + + /** + * Allows a VertexManagerPlugin to send events of custom payload to processor + * of a specific task of managed vertex + * + * It's up to user to make sure taskId is valid + * + * @param events events to be sent + * @param taskId id of a task of managed vertex + */ + public void sendEventToProcessor(Collection events, int taskId); @Deprecated /** http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java new file mode 100644 index 0000000..9cfb02e --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api.events; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.runtime.api.Event; + +import java.nio.ByteBuffer; + +public class CustomProcessorEvent extends Event { + private ByteBuffer payload; + + /** + * Version number to indicate what app attempt generated this Event + */ + private int version; + + private CustomProcessorEvent(ByteBuffer payload) { + this(payload, -1); + } + + private CustomProcessorEvent(ByteBuffer payload, int version) { + this.payload = payload; + this.version = version; + } + + public static CustomProcessorEvent create(ByteBuffer payload) { + return new CustomProcessorEvent(payload); + } + + @Private + public static CustomProcessorEvent create(ByteBuffer payload, int version) { + return new CustomProcessorEvent(payload, version); + } + + public ByteBuffer getPayload() { + return payload; + } + + @Private + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/proto/Events.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index e018864..7123500 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -69,3 +69,8 @@ message RootInputInitializerEventProto { optional string target_input_name = 2; optional bytes user_payload = 3; } + +message CustomProcessorEventProto { + optional bytes user_payload = 1; + required int32 version = 2; +} http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index c329ec7..13cfb8f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -167,6 +167,7 @@ import org.apache.tez.runtime.api.OutputStatistics; import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; @@ -3884,6 +3885,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } EventMetaData sourceMeta = tezEvent.getSourceInfo(); switch(tezEvent.getEventType()) { + case CUSTOM_PROCESSOR_EVENT: + { + // set version as app attempt id + ((CustomProcessorEvent) tezEvent.getEvent()).setVersion( + appContext.getApplicationAttemptId().getAttemptId()); + // route event to task + EventMetaData destinationMeta = tezEvent.getDestinationInfo(); + Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID()); + targetTask.registerTezEvent(tezEvent); + } + break; case INPUT_FAILED_EVENT: case DATA_MOVEMENT_EVENT: case COMPOSITE_DATA_MOVEMENT_EVENT: http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index b7d3428..7a1547f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -36,6 +36,9 @@ import javax.annotation.Nullable; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.tez.dag.app.dag.event.DAGEventInternalError; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -272,6 +275,29 @@ public class VertexManager { // Recovery handling is taken care of by the Vertex. } + @Override + public void sendEventToProcessor(Collection events, int taskId) { + checkAndThrowIfDone(); + Preconditions.checkArgument(taskId >= 0 && taskId < managedVertex.getTotalTasks(), + "Invalid taskId " + taskId + "; " + "There are " + managedVertex.getTotalTasks() + + " tasks in total."); + + if (events != null && events.size() > 0) { + List tezEvents = new ArrayList<>(); + for (CustomProcessorEvent event : events) { + TezEvent tezEvent = new TezEvent(event, null); + // use dummy task attempt id since this is not an task attempt specific event and task + // attempt id won't be used anyway + EventMetaData destinationMeta = new EventMetaData(EventProducerConsumerType.PROCESSOR, + managedVertex.getName(), managedVertex.getName(), + TezTaskAttemptID.getInstance(managedVertex.getTask(taskId).getTaskId(), -1)); + tezEvent.setDestinationInfo(destinationMeta); + tezEvents.add(tezEvent); + } + appContext.getEventHandler().handle( + new VertexEventRouteEvent(managedVertex.getVertexId(), tezEvents)); + } + } @Override public synchronized void setVertexLocationHint(VertexLocationHint locationHint) { http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java index 3d9f271..c850d68 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java @@ -18,14 +18,17 @@ package org.apache.tez.dag.app.dag.impl; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,8 +59,10 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.CallableEvent; import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.GroupInputSpec; @@ -214,10 +219,9 @@ public class TestVertexManager { @Test(timeout = 5000) public void testVMPluginCtxGetInputVertexGroup() throws Exception { VertexManager vm = - new VertexManager( - VertexManagerPluginDescriptor.create(CustomVertexManager.class - .getName()), UserGroupInformation.getCurrentUser(), - mockVertex, mockAppContext, mock(StateChangeNotifier.class)); + new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()), + UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext, + mock(StateChangeNotifier.class)); assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty()); @@ -232,6 +236,51 @@ public class TestVertexManager { assertTrue(groups.get(group).contains(v2)); } + @Test(timeout = 5000) + public void testSendCustomProcessorEvent() throws Exception { + VertexManager vm = + new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()), + UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext, + mock(StateChangeNotifier.class)); + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(VertexEventRouteEvent.class); + + when(mockVertex.getTotalTasks()).thenReturn(2); + + List events = new ArrayList<>(); + // task id too small, should fail + try { + vm.pluginContext.sendEventToProcessor(events, -1); + fail("Should fail for invalid task id"); + } catch (IllegalArgumentException exception) { + assertTrue(exception.getMessage().contains("Invalid taskId")); + } + // task id too large, should fail + try { + vm.pluginContext.sendEventToProcessor(events, 10); + fail("Should fail for invalid task id"); + } catch (IllegalArgumentException exception) { + assertTrue(exception.getMessage().contains("Invalid taskId")); + } + + // null event, do nothing + vm.pluginContext.sendEventToProcessor(null, 0); + verify(mockHandler, never()).handle(requestCaptor.capture()); + + // empty event + vm.pluginContext.sendEventToProcessor(events, 1); + verify(mockHandler, never()).handle(requestCaptor.capture()); + + //events.add(); + byte[] payload = new byte[] {1,2,3}; + events.add(CustomProcessorEvent.create(ByteBuffer.wrap(payload))); + vm.pluginContext.sendEventToProcessor(events, 1); + verify(mockHandler, times(1)).handle(requestCaptor.capture()); + CustomProcessorEvent cpe = + (CustomProcessorEvent)(requestCaptor.getValue().getEvents().get(0).getEvent()); + assertArrayEquals(payload, cpe.getPayload().array()); + } + public static class CustomVertexManager extends VertexManagerPlugin { private Map> cachedEventMap = new HashMap>(); http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index ea90158..e428570 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -21,6 +21,7 @@ package org.apache.tez.common; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.EventProtos; @@ -31,6 +32,23 @@ import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto; public class ProtoConverters { + public static EventProtos.CustomProcessorEventProto convertCustomProcessorEventToProto( + CustomProcessorEvent event) { + EventProtos.CustomProcessorEventProto.Builder builder = + EventProtos.CustomProcessorEventProto.newBuilder(); + if (event.getPayload() != null) { + builder.setUserPayload(ByteString.copyFrom(event.getPayload())); + } + builder.setVersion(event.getVersion()); + return builder.build(); + } + + public static CustomProcessorEvent convertCustomProcessorEventFromProto( + EventProtos.CustomProcessorEventProto proto) { + return CustomProcessorEvent.create(proto.getUserPayload() != null ? + proto.getUserPayload().asReadOnlyByteBuffer() : null, proto.getVersion()); + } + public static EventProtos.DataMovementEventProto convertDataMovementEventToProto( DataMovementEvent event) { EventProtos.DataMovementEventProto.Builder builder = http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java index e573526..7e365b1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java @@ -31,4 +31,5 @@ public enum EventType { COMPOSITE_DATA_MOVEMENT_EVENT, ROOT_INPUT_INITIALIZER_EVENT, COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT, + CUSTOM_PROCESSOR_EVENT, } http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index 1a90ada..e7af4a1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -30,6 +30,7 @@ import org.apache.tez.common.TezConverterUtils; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; import org.apache.tez.runtime.api.events.EventProtos; @@ -57,6 +58,8 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import static org.apache.tez.runtime.api.events.EventProtos.*; + public class TezEvent implements Writable { private EventType eventType; @@ -82,6 +85,8 @@ public class TezEvent implements Writable { this.setSourceInfo(sourceInfo); if (event instanceof DataMovementEvent) { eventType = EventType.DATA_MOVEMENT_EVENT; + } else if (event instanceof CustomProcessorEvent) { + eventType = EventType.CUSTOM_PROCESSOR_EVENT; } else if (event instanceof CompositeDataMovementEvent) { eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT; } else if (event instanceof CompositeRoutedDataMovementEvent) { @@ -157,6 +162,11 @@ public class TezEvent implements Writable { } else { AbstractMessage message; switch (eventType) { + case CUSTOM_PROCESSOR_EVENT: + message = + ProtoConverters.convertCustomProcessorEventToProto( + (CustomProcessorEvent) event); + break; case DATA_MOVEMENT_EVENT: message = ProtoConverters.convertDataMovementEventToProto( @@ -260,6 +270,11 @@ public class TezEvent implements Writable { } input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen); switch (eventType) { + case CUSTOM_PROCESSOR_EVENT: + CustomProcessorEventProto cpProto = + CustomProcessorEventProto.parseFrom(input); + event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto); + break; case DATA_MOVEMENT_EVENT: DataMovementEventProto dmProto = DataMovementEventProto.parseFrom(input);