apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [APEX-3] Sub task - Add support for ProxyPorts in Modules - APEX-194
Date Thu, 15 Oct 2015 08:22:34 GMT
Hi All,

Here is an example of how we can use proxy ports when using modules.


public class TestModule implements Module

  // Proxy Input Port
  public transient ProxyInputPort<Integer> mInput = new

  // Proxy Output Port for Even Numbers

  public transient ProxyOutputPort<Integer> mOutputEven = new
  // Proxy Output Port for Odd Numbers
  public transient ProxyOutputPort<Integer> mOutputOdd = new

  public void populateDAG(DAG dag, Configuration conf) // populateDag() for
    // Operator Inside the Module. Splits the input stream (integers) into
two streams odd and even (one for each odd and even numbers)
    OddEvenOperator moduleOperator = dag.addOperator("TestModule", new

    // Set the actual InputPort and OutputPort objects inside Proxy Input
and Proxy Output ports
    // This will get evaluated when the dag is expanded. Once it is done,
the engine can replace the proxy port streams with actual ports.



Please give your feedback on this approach.



On Wed, Oct 14, 2015 at 12:26 PM, Bhupesh Chawda <bhupesh@datatorrent.com>

> Hi All,
> As part of supporting modules, we need some mechanism for mapping the
> module ports to the operator ports. This is because, when an Apex
> application defines a DAG, the module is visible only as a black box, until
> the populateDag() method of that module is called. Till that time, only the
> module's ports are exposed and streams are created based on the module's
> ports. The ports of the operators within the module are hidden till that
> time. Now, when the DAG is expanded, the operators inside the module get
> exposed and we can know what are the correct end points for the streams
> that were defined earlier.
> Here is the JIRA:
> https://malhar.atlassian.net/browse/APEX-194
> We are planning to take the following approach for remedying this:
>    - Define a type ProxyPort<T> which can be an InputPort or an
>    OutputPort.
> public interface ProxyPort<T> extends Port
>> {
>>   void set(T port);
>>   T get();
>> }
> public final class ProxyInputPort<T> extends DefaultInputPort<T>
>> implements ProxyPort<InputPort<T>>
>> public final class ProxyOutputPort<T> extends DefaultOutputPort<T>
>> implements ProxyPort<OutputPort<T>>
>    - Use this ProxyPort as the port type for a module.
> public transient ProxyInputPort<Integer> moduleInput = new
>> Operator.ProxyInputPort<Integer>();
>> public transient ProxyOutputPort<Integer> moduleOutput = new
>> Operator.ProxyOutputPort<Integer>();
>    - Create streams using the ProxyPort, when the populateDag() method of
>    the Application is called. Effectively we plan to capture the stream end
>    points in terms of ProxyPorts when addStream() is called.
>    - Once the modules are expanded, and operators are visible, replace
>    the ProxyPorts with the actual operator ports.
> Thanks.
> --
> -Bhupesh

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message