apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-3) Ability for an operator to populate DAG at launch time
Date Mon, 21 Dec 2015 16:54:46 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-3?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15066697#comment-15066697
] 

ASF GitHub Bot commented on APEXCORE-3:
---------------------------------------

Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/189#discussion_r48165286
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---
    @@ -1116,13 +1320,78 @@ public StreamMeta addStream(String id)
       public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T>
source, Operator.InputPort<? super T>... sinks)
       {
         StreamMeta s = addStream(id);
    -    s.setSource(source);
    -    for (Operator.InputPort<?> sink: sinks) {
    -      s.addSink(sink);
    +    id = s.id;
    +    ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>>
streamMap = ArrayListMultimap.create();
    +    if (!(source instanceof ProxyOutputPort)) {
    +      s.setSource(source);
    +    }
    +    for (Operator.InputPort<?> sink : sinks) {
    +      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
    +        streamMap.put(source, sink);
    +        streamLinks.put(id, streamMap);
    +      } else {
    +        if (s.getSource() == null) {
    +          s.setSource(source);
    +        }
    +        s.addSink(sink);
    +      }
         }
         return s;
       }
     
    +  /**
    +   * This will be called once the Logical Dag is expanded, and the proxy input and proxy
output ports are populated with the actual ports that they refer to
    +   * This method adds sources and sinks for the StreamMeta objects which were left empty
in the addStream call.
    +   */
    +  public void applyStreamLinks()
    +  {
    +    for (String id : streamLinks.keySet()) {
    +      StreamMeta s = getStream(id);
    +      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>>
pair : streamLinks.get(id).entries()) {
    +        if (s.getSource() == null) {
    +          Operator.OutputPort<?> outputPort = pair.getKey();
    +          while (outputPort instanceof ProxyOutputPort) {
    +            outputPort = ((ProxyOutputPort<?>)outputPort).get();
    +          }
    +          s.setSource(outputPort);
    +        }
    +
    +        Operator.InputPort<?> inputPort = pair.getValue();
    +        while (inputPort instanceof ProxyInputPort) {
    +          inputPort = ((ProxyInputPort<?>)inputPort).get();
    +        }
    +        s.addSink(inputPort);
    +      }
    +    }
    +  }
    +
    +  @SuppressWarnings({ "unchecked", "rawtypes" })
    +  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
    +  {
    +    LogicalPlan subDag = moduleMeta.getDag();
    +    String subDAGName = moduleMeta.getName();
    +    String name;
    +    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
    +      this.addOperator(name, operatorMeta.getOperator());
    +      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
    +      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName
: subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
    +    }
    +
    +    for (StreamMeta streamMeta : subDag.getAllStreams()) {
    +      OutputPortMeta sourceMeta = streamMeta.getSource();
    +      List<InputPort<?>> ports = new LinkedList<>();
    +      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
    +        ports.add(inputPortMeta.getPortObject());
    +      }
    +      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
    +
    +      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
    +      StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
    +      streamMetaNew.setModuleName(streamMeta.getModuleName() == null ? subDAGName : subDAGName
+ "_" + streamMeta.getModuleName());
    --- End diff --
    
    Shouldn't the module name be same for all operators and streams? Why construct it in the
loops?


> Ability for an operator to populate DAG at launch time
> ------------------------------------------------------
>
>                 Key: APEXCORE-3
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-3
>             Project: Apache Apex Core
>          Issue Type: New Feature
>            Reporter: Amol Kekre
>            Assignee: Vlad Rozov
>            Priority: Critical
>
> Apex should have an operator API that lets the operator generate DAG during launch time.
This will mean the following
> - Logical DAG will have one operator. This is the operator that will generate a DAG underneath
> - Physical plan will have the DAG generated by the operator
> - Execution plan will mimic physical plan + container location etc.
> For example lets say we have three operators in a DAG (app) A->B->C
> B during launch time generates a DAG B1->B2->B3, then the physical plan will be
> A->B1->B2->B3->C
> This should work irrespective of number of ports, etc. A typical flattening. The operators
inside of B (B1, B2, B3) should have properties and attributes just as any. Users should be
able to access these at run time and compile time. B itself should support properties and
attributes that B1, B2, B3 can inherit from.
> This is a very critical feature as it will open up users to plug-in their own engines
and still take up complete operability support from Apex engine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message