commons-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Tanaka <>
Subject Re: [PIPELINE] Questions about pipeline
Date Tue, 28 Oct 2008 21:24:15 GMT

Tim Dudgeon wrote:
> Ken Tanaka wrote:
>> Hi Tim, 
>> At present, the structure for storing stages is a linked list, and 
>> branches are implemented as additional pipelines accessed by a name 
>> through a HashMap. To generally handle branching and merging, a 
>> directed acyclic graph (DAG) would better serve, but that would 
>> require the pipeline code to be rewritten at this level. Arguments 
>> could also be made for allowing cycles, as in directed graphs, but 
>> that would be harder to debug, and with a GUI might be a step toward 
>> a visual programming language--so I don't think this should be 
>> pursued yet unless there are volunteers...
> I agree, DAG would be better, but cycles could be needeed too, so DG 
> would be better too.
> But, yes, I am ideally wanting visual designer too.
I'd like a visual designer too at some point, but that's a ways off into 
the future.
>>> Taken together I can see a generalisation here using named ports 
>>> (input and outut), which is similar, but not identical, to your 
>>> current concept of branches.
>>> So you have:
>>> BaseStage.emit(String branch, Object obj);
>>> whereas I would conceptually see this as:
>>> emit(String port, Object obj);
>>> and you have:
>>> Stage.process(Object obj);
>>> whereas I would would conceptually see this as:
>>> Stage.process(String port, Object obj);
>>> And when a pipeline is being assembled a downstream stage is 
>>> attached to a particular port of a stage, not the stage itself. It 
>>> then just recieves data sent to that particular port, but not the 
>>> other ports.
>> I could see that this would work, but would need either modifying a 
>> number of stages already written, or maybe creating a compatibility 
>> stage driver that takes older style stages so that the input object 
>> comes from a configured port name, usually "input" and a sends the 
>> output to  configured output ports named "output" and whatever the 
>> previous branch name(s) were, if any. Stages that used to look for 
>> events for input should be rewritten to read multiple inputs ( 
>> Stage.process(String port, Object obj) as you suggested). Events 
>> would then be reserved for truly out-of-band signals between stages 
>> rather than carrying data for processing.
> Agreed, I think with would be good. I think existing stages could be 
> made compatible by having a default input and output port, and to use 
> those if not specific port was specified.
> A default in/out port would probably be necessary to allow simple 
> auto-wiring.
>>> I'd love to hear how compatible the current system is with this way 
>>> of seeing things. Are we just talking about a new type of Stage 
>>> implementation, or a more fundamental incompatibility at the API level.
>> I think you have some good ideas. This is changing the Stage 
>> implementation, which affects on the order of 60 stages for us that 
>> override the process method, unless the compatibility stage driver 
>> works out. The top level pipeline would also be restructured. The 
>> amount of work required puts this out of the near term for me to work 
>> on it, but there may be other developers/contributors to take this on.
> I need to investigate more fully here, and consider the other options.
> But potentially this is certainly of interest.
> So is all that's necessary to prototype this to create a new Stage 
> implementation, with new emit( ... ) and process( ... ) methods?
I'm thinking it's more involved than that. To really deal well with the 
arbitrary number of downstream stages rather than just one means 
changing the digester rules 

on specifying what follows. Normally a stage is connected to the 
preceding stage if it is listed in that order in the configuration file. 
This should be a default behavior, but if  stage2 and stage3 both follow 
stage1 then some notation of which is the previous stage is needed.


might be set up as conf_pipe.xml:
   <stage className="com.demo.pipeline.stages.Stage1" 
driverFactoryId="df1" stageId="stage1"/>
   <stage className="com.demo.pipeline.stages.Stage2" 
   <stage className="com.demo.pipeline.stages.Stage3" 
driverFactoryId="df1" follows="stage1"/>

I propose the 'follows="stage1"' attribute to connect stage3 to stage1 
instead of stage2 immediately preceding. This seems cleaner than setting 
up a branch and matching up branch key names between the branching stage 
and the secondary pipeline(s). Can you think of a cleaner way to 
configure this?

The class will need to be modified to build and maintain a 
DAG structure rather than a linked list. The incoming data are managed 
by a queue in the stage driver, which would change to a group of queues, 
allowing multiple inputs (ports). I'm assuming there is an open source 
directed acyclic graph library out there that can replace the linked list.


To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message