tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject tez git commit: TEZ-3855. Allow vertex manager to send event to processor (zhiyuany)
Date Sat, 11 Nov 2017 00:47:10 GMT
Repository: tez
Updated Branches:
  refs/heads/master a24652735 -> b96f79fa7


TEZ-3855. Allow vertex manager to send event to processor (zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b96f79fa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b96f79fa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b96f79fa

Branch: refs/heads/master
Commit: b96f79fa75dc6cf47e4d648b028ccb12f02308a6
Parents: a246527
Author: Zhiyuan Yang <zhiyuany@apache.org>
Authored: Fri Nov 10 16:44:29 2017 -0800
Committer: Zhiyuan Yang <zhiyuany@apache.org>
Committed: Fri Nov 10 16:44:29 2017 -0800

----------------------------------------------------------------------
 .../tez/dag/api/VertexManagerPluginContext.java | 13 ++++
 .../api/events/CustomProcessorEvent.java        | 65 ++++++++++++++++++++
 tez-api/src/main/proto/Events.proto             |  5 ++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 12 ++++
 .../tez/dag/app/dag/impl/VertexManager.java     | 26 ++++++++
 .../tez/dag/app/dag/impl/TestVertexManager.java | 57 +++++++++++++++--
 .../org/apache/tez/common/ProtoConverters.java  | 18 ++++++
 .../apache/tez/runtime/api/impl/EventType.java  |  1 +
 .../apache/tez/runtime/api/impl/TezEvent.java   | 15 +++++
 9 files changed, 208 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index b858a65..b89b279 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.VertexStatistics;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 
 import com.google.common.base.Preconditions;
@@ -268,6 +270,17 @@ public interface VertexManagerPluginContext {
    *          task to which events need to be sent.
    */
   public void addRootInputEvents(String inputName, Collection<InputDataInformationEvent>
events);
+
+  /**
+   * Allows a VertexManagerPlugin to send events of custom payload to processor
+   * of a specific task of managed vertex
+   *
+   * It's up to user to make sure taskId is valid
+   *
+   * @param events events to be sent
+   * @param taskId id of a task of managed vertex
+   */
+  public void sendEventToProcessor(Collection<CustomProcessorEvent> events, int taskId);
   
   @Deprecated
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java
new file mode 100644
index 0000000..9cfb02e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/CustomProcessorEvent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.api.Event;
+
+import java.nio.ByteBuffer;
+
+public class CustomProcessorEvent extends Event {
+  private ByteBuffer payload;
+
+  /**
+   * Version number to indicate what app attempt generated this Event
+   */
+  private int version;
+
+  private CustomProcessorEvent(ByteBuffer payload) {
+    this(payload, -1);
+  }
+
+  private CustomProcessorEvent(ByteBuffer payload, int version) {
+    this.payload = payload;
+    this.version = version;
+  }
+
+  public static CustomProcessorEvent create(ByteBuffer payload) {
+    return new CustomProcessorEvent(payload);
+  }
+
+  @Private
+  public static CustomProcessorEvent create(ByteBuffer payload, int version) {
+    return new CustomProcessorEvent(payload, version);
+  }
+
+  public ByteBuffer getPayload() {
+    return payload;
+  }
+
+  @Private
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index e018864..7123500 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -69,3 +69,8 @@ message RootInputInitializerEventProto {
   optional string target_input_name = 2;
   optional bytes user_payload = 3;
 }
+
+message CustomProcessorEventProto {
+  optional bytes user_payload = 1;
+  required int32 version = 2;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c329ec7..13cfb8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -167,6 +167,7 @@ import org.apache.tez.runtime.api.OutputStatistics;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
@@ -3884,6 +3885,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       }
       EventMetaData sourceMeta = tezEvent.getSourceInfo();
       switch(tezEvent.getEventType()) {
+      case CUSTOM_PROCESSOR_EVENT:
+        {
+          // set version as app attempt id
+          ((CustomProcessorEvent) tezEvent.getEvent()).setVersion(
+            appContext.getApplicationAttemptId().getAttemptId());
+          // route event to task
+          EventMetaData destinationMeta = tezEvent.getDestinationInfo();
+          Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID());
+          targetTask.registerTezEvent(tezEvent);
+        }
+        break;
       case INPUT_FAILED_EVENT:
       case DATA_MOVEMENT_EVENT:
       case COMPOSITE_DATA_MOVEMENT_EVENT:

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index b7d3428..7a1547f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -36,6 +36,9 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -272,6 +275,29 @@ public class VertexManager {
       // Recovery handling is taken care of by the Vertex.
     }
 
+    @Override
+    public void sendEventToProcessor(Collection<CustomProcessorEvent> events, int taskId)
{
+      checkAndThrowIfDone();
+      Preconditions.checkArgument(taskId >= 0 && taskId < managedVertex.getTotalTasks(),
+        "Invalid taskId " + taskId + "; " + "There are " + managedVertex.getTotalTasks()
+          + " tasks in total.");
+
+      if (events != null && events.size() > 0) {
+        List<TezEvent> tezEvents = new ArrayList<>();
+        for (CustomProcessorEvent event : events) {
+          TezEvent tezEvent = new TezEvent(event, null);
+          // use dummy task attempt id since this is not an task attempt specific event and
task
+          // attempt id won't be used anyway
+          EventMetaData destinationMeta = new EventMetaData(EventProducerConsumerType.PROCESSOR,
+            managedVertex.getName(), managedVertex.getName(),
+            TezTaskAttemptID.getInstance(managedVertex.getTask(taskId).getTaskId(), -1));
+          tezEvent.setDestinationInfo(destinationMeta);
+          tezEvents.add(tezEvent);
+        }
+        appContext.getEventHandler().handle(
+          new VertexEventRouteEvent(managedVertex.getVertexId(), tezEvents));
+      }
+    }
 
     @Override
     public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 3d9f271..c850d68 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -18,14 +18,17 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -56,8 +59,10 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.CallableEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
@@ -214,10 +219,9 @@ public class TestVertexManager {
   @Test(timeout = 5000)
   public void testVMPluginCtxGetInputVertexGroup() throws Exception {
     VertexManager vm =
-      new VertexManager(
-        VertexManagerPluginDescriptor.create(CustomVertexManager.class
-          .getName()), UserGroupInformation.getCurrentUser(),
-        mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+      new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()),
+        UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext,
+        mock(StateChangeNotifier.class));
 
     assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty());
 
@@ -232,6 +236,51 @@ public class TestVertexManager {
     assertTrue(groups.get(group).contains(v2));
   }
 
+  @Test(timeout = 5000)
+  public void testSendCustomProcessorEvent() throws Exception {
+    VertexManager vm =
+      new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()),
+        UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext,
+        mock(StateChangeNotifier.class));
+    ArgumentCaptor<VertexEventRouteEvent> requestCaptor =
+      ArgumentCaptor.forClass(VertexEventRouteEvent.class);
+
+    when(mockVertex.getTotalTasks()).thenReturn(2);
+
+    List<CustomProcessorEvent> events = new ArrayList<>();
+    // task id too small, should fail
+    try {
+      vm.pluginContext.sendEventToProcessor(events, -1);
+      fail("Should fail for invalid task id");
+    } catch (IllegalArgumentException exception) {
+      assertTrue(exception.getMessage().contains("Invalid taskId"));
+    }
+    // task id too large, should fail
+    try {
+      vm.pluginContext.sendEventToProcessor(events, 10);
+      fail("Should fail for invalid task id");
+    } catch (IllegalArgumentException exception) {
+      assertTrue(exception.getMessage().contains("Invalid taskId"));
+    }
+
+    // null event, do nothing
+    vm.pluginContext.sendEventToProcessor(null, 0);
+    verify(mockHandler, never()).handle(requestCaptor.capture());
+
+    // empty event
+    vm.pluginContext.sendEventToProcessor(events, 1);
+    verify(mockHandler, never()).handle(requestCaptor.capture());
+
+    //events.add();
+    byte[] payload = new byte[] {1,2,3};
+    events.add(CustomProcessorEvent.create(ByteBuffer.wrap(payload)));
+    vm.pluginContext.sendEventToProcessor(events, 1);
+    verify(mockHandler, times(1)).handle(requestCaptor.capture());
+    CustomProcessorEvent cpe =
+      (CustomProcessorEvent)(requestCaptor.getValue().getEvents().get(0).getEvent());
+    assertArrayEquals(payload, cpe.getPayload().array());
+  }
+
   public static class CustomVertexManager extends VertexManagerPlugin {
 
     private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index ea90158..e428570 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -21,6 +21,7 @@ package org.apache.tez.common;
 import com.google.protobuf.ByteString;
 
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
@@ -31,6 +32,23 @@ import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
 
 public class ProtoConverters {
 
+  public static EventProtos.CustomProcessorEventProto convertCustomProcessorEventToProto(
+    CustomProcessorEvent event) {
+    EventProtos.CustomProcessorEventProto.Builder builder =
+        EventProtos.CustomProcessorEventProto.newBuilder();
+    if (event.getPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getPayload()));
+    }
+    builder.setVersion(event.getVersion());
+    return builder.build();
+  }
+
+  public static CustomProcessorEvent convertCustomProcessorEventFromProto(
+    EventProtos.CustomProcessorEventProto proto) {
+    return CustomProcessorEvent.create(proto.getUserPayload() != null ?
+      proto.getUserPayload().asReadOnlyByteBuffer() : null, proto.getVersion());
+  }
+
   public static EventProtos.DataMovementEventProto convertDataMovementEventToProto(
       DataMovementEvent event) {
     EventProtos.DataMovementEventProto.Builder builder =

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index e573526..7e365b1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -31,4 +31,5 @@ public enum EventType {
   COMPOSITE_DATA_MOVEMENT_EVENT,
   ROOT_INPUT_INITIALIZER_EVENT,
   COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT,
+  CUSTOM_PROCESSOR_EVENT,
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/b96f79fa/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 1a90ada..e7af4a1 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -30,6 +30,7 @@ import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.CustomProcessorEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
@@ -57,6 +58,8 @@ import com.google.protobuf.AbstractMessage;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 
+import static org.apache.tez.runtime.api.events.EventProtos.*;
+
 public class TezEvent implements Writable {
 
   private EventType eventType;
@@ -82,6 +85,8 @@ public class TezEvent implements Writable {
     this.setSourceInfo(sourceInfo);
     if (event instanceof DataMovementEvent) {
       eventType = EventType.DATA_MOVEMENT_EVENT;
+    } else if (event instanceof CustomProcessorEvent) {
+      eventType = EventType.CUSTOM_PROCESSOR_EVENT;
     } else if (event instanceof CompositeDataMovementEvent) {
       eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT;
     } else if (event instanceof CompositeRoutedDataMovementEvent) {
@@ -157,6 +162,11 @@ public class TezEvent implements Writable {
     } else {
       AbstractMessage message;
       switch (eventType) {
+      case CUSTOM_PROCESSOR_EVENT:
+        message =
+            ProtoConverters.convertCustomProcessorEventToProto(
+                (CustomProcessorEvent) event);
+        break;
       case DATA_MOVEMENT_EVENT:
         message =
             ProtoConverters.convertDataMovementEventToProto(
@@ -260,6 +270,11 @@ public class TezEvent implements Writable {
       }
       input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
       switch (eventType) {
+      case CUSTOM_PROCESSOR_EVENT:
+        CustomProcessorEventProto cpProto =
+            CustomProcessorEventProto.parseFrom(input);
+        event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto);
+        break;
       case DATA_MOVEMENT_EVENT:
         DataMovementEventProto dmProto =
             DataMovementEventProto.parseFrom(input);


Mime
View raw message