commons-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Tanaka <Ken.Tan...@noaa.gov>
Subject Re: [PIPELINE] Questions about pipeline
Date Fri, 26 Sep 2008 18:17:31 GMT
Hello Istvan,

I recommend limiting the input queue size that is used by the stage 
driver. The DedicatedThreadStageDriverFactory uses a blocking queue. If 
you are configuring your pipeline with a Digester configuration file the 
XML would look like this (look for the word "capacity" near the end):

<?xml version="1.0" encoding="UTF-8"?>

<!--
    Document   : conf_examplepipeline.xml
    Description:
        Configuration file for data loading pipeline
-->

<pipeline>
   
    <driverFactory
        
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" 

        id="df1" faultToleranceLevel="ALL"/>
   
    <!-- Find the data file(s), surveyData.YYYY-MM-DD.
         ** FAST STAGE **
         Input:  starting directory
         Output: Data File object
    -->
    <!-- 
================================================================== -->
    <stage className="org.apache.commons.pipeline.stage.FileFinderStage"
           filePattern="surveyData.*"
           driverFactoryId="df1" />
   
    <feed>
        <value>/data/prod/ingestNow</value>
    </feed>
   


    <!-- Unpack the data file. Each input file becomes a bunch of data 
bean objects.
         ** FAST STAGE **
         Input: Data File object
         Output: Data point object
    -->
    <!-- 
================================================================== -->
    <stage className="gov.noaa.eds.SurveyReaderStage"
           driverFactoryId="df1" />

    <!-- Write the data beans to the database
         ** SLOW STAGE - Set ArrayBlockingQueueFactory capacity to 10 **
        
         The queueFactory must be a blocking type that implements the
         java.util.concurrent.BlockingQueue interface.
        
         Input:  Data point object
         Output: Data point object
    -->
    <!-- 
================================================================== -->
    <stage className="gov.noaa.eds.WriteSurveyToDatabaseStage"
           driverFactoryId="df1" >
        <property propName="queueFactory"
                  
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                  capacity="10" fair="false"/>
    </stage>

</pipeline>

Alternatively, you could set up the driver factory with the same 
property so that all stages have the same input queue size:

    <driverFactory
        
className="org.apache.commons.pipeline.driver.DedicatedThreadStageDriverFactory" 

        id="df1" faultToleranceLevel="ALL">
        <property propName="queueFactory"
                  
className="org.apache.commons.pipeline.util.BlockingQueueFactory$ArrayBlockingQueueFactory"
                  capacity="10" fair="false"/>
    </driverFactory>

Aquator wrote:
> Hi,
>
> I am playing with pipeline, and I have some questions regarding to usage.
>
> What is the suggested method of avoiding "traffic-jam" in the pipe? I mean, when a stage
produces results fast, followed by a long-running stage. I will run out of stack space in
large amount of input data.
>
> Currently, my solution uses the context raise/registerListener methods. The slow stage
notifies it's "feeder" stage, that new data can be processed. Is there any better ideas for
this problem?
>
> My other issue is about branches. Is there a way to attach separated branches together?
For example, a stage needs input from two different branches. Is there any solution to apply
a synchronized data flow? (Lets say I have two branches, one produces A-s, and the other produces
B-s. I want a stage, that is being fed by those two branches, and produces a sequence of ABABAB...)
Is there an implementation for such behaviour?
>
>   
Normally each branch should carry just one type of data, otherwise all 
the stages on a branch of pipeline processing end up have the same 
conditional logic at the beginning to sort out what to do with the 
incoming objects. If you do need to combine information back into 
another branch, you should do it by raising events, which can bring 
along with them the data transport beans you wish combine. The receiving 
stage needs to register a listener to bring those in. I have some 
documentation on this, but it's not publicly available yet. If you want 
I can email you the HTML directly.
> Finally, I am interested about the various StageDrivers. I'd like some more detailed
informations
> then the API. Especially usage advices, samples, to help choose the best stagedriver
for the certain stages.
>
> Thanks in advance for your time,
> Istvan Cseh 
>
> ______________________________________________________________________
> Olcsó repülőjegyet mindenkinek!
> Repjegyek a legjobb napi áron akár BBP és sztornó biztosítással is.
> repulojegy.budavartours.hu
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
> For additional commands, e-mail: user-help@commons.apache.org
>
>   



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@commons.apache.org
For additional commands, e-mail: user-help@commons.apache.org


Mime
View raw message