synapse-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hiranya Jayathilaka <hiranya...@gmail.com>
Subject Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java
Date Wed, 13 May 2009 10:12:48 GMT
On Wed, May 13, 2009 at 2:41 PM, Andreas Veithen
<andreas.veithen@gmail.com>wrote:

> We definitely need to make the WorkerPool created by
> AbstractTransportListener configurable. The question whether this
> should be done using a property file or using parameters in
> TransportInDescription (as with all other configuration settings).
> What are the arguments in favor of doing it in a separate property
> file?


Nothing significant I guess. It might probably save us a few bytes of memory
since with a file based approach we don't really have to save those
parameters in the configuration context. But that's not a big motivating
facator IMO. May be the only plus point worth considering is that it's just
consistent with the existing approach for configuring thread pools. We
currently use nhttp.properties to configure the HTTP-NIO thread pool.

But I'm ok with using transport parameters to do this.

Thanks,
Hiranya


>
> Andreas
>
> On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hiranya911@gmail.com>
> wrote:
> >
> >
> > On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ruwan.linton@gmail.com>
> > wrote:
> >>
> >> I think we need to make that configurable as well.... currently hard
> >> codded setting will work in 98% of the cases, but there can be a
> scenario
> >> where it requires a tune up.
> >>
> >> Can we do this in a manner that we can configure them per transport.
> >
> > One simple solution would be to read a transport specific configuration
> file
> > at AbstractTransportListener#init(). The init method gets a
> > TransportInDescription object as an argument and from that we can
> retrieve
> > the transport name to construct a file name unique to a given transport
> (eg:
> > mail.properties, vfs.properties). This approach has the benefit that it
> > doesn't require changes to any of the actual transport implementations.
> > Everything is taken care of by the abstract class.
> >
> > However this class now belongs to the WS-Commons transports project. So
> the
> > enhancement should be made there.
> >
> > Thanks,
> > Hiranya
> >
> >
> >>
> >> Thanks,
> >> Ruwan
> >>
> >> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
> >> <hiranya911@gmail.com> wrote:
> >>>
> >>> Hi Ruwan,
> >>>
> >>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ruwan.linton@gmail.com
> >
> >>> wrote:
> >>>>
> >>>> Hiranya,
> >>>>
> >>>> If you can make the worker pool configurable that would be of much
> >>>> importance... you may have a look at the nhttp transport thread pool,
> which
> >>>> can be configurable via the nhttp.properties file.
> >>>
> >>> Currently the FIX sender initializes the WorkerPool in a manner similar
> >>> to the AbstractTransportListener. The WorkerPool in
> >>> AbstractTransportListener is used by several transports (JMS, Mail etc)
> via
> >>> inheritance. FIX listener also makes use of the same thread pool. Do we
> have
> >>> any plans to make that thread pool configurable too? Othrewise I don't
> think
> >>> it mkes much sense just to make the FIX sender's thread pool
> configurable.
> >>>
> >>> Thanks,
> >>> Hiranya
> >>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>> Ruwan
> >>>>
> >>>> On Tue, May 12, 2009 at 1:30 PM, <hiranya@apache.org> wrote:
> >>>>>
> >>>>> Author: hiranya
> >>>>> Date: Tue May 12 08:00:27 2009
> >>>>> New Revision: 773818
> >>>>>
> >>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
> >>>>> Log:
> >>>>> Enhancements and code cleanup in the FIX transport:
> >>>>> * FIX sender now has its own worker pool and hence does not rely
on
> the
> >>>>> FIX listener any more. Therefore listener and sender can be enabled
> >>>>> individually
> >>>>> * Made FIXSessionFactory a singleton to effectively share session
> data
> >>>>> among the listener and the sender
> >>>>> * Cleanup logic for initiators during sender shutdown
> >>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
> >>>>> sample 259 and similar scenarios from operating properly
> >>>>>
> >>>>> Modified:
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>>
> >>>>>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -27,15 +27,12 @@
> >>>>>  public class FIXApplicationFactory {
> >>>>>
> >>>>>     private ConfigurationContext cfgCtx;
> >>>>> -    private WorkerPool workerPool;
> >>>>> -
> >>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
> >>>>> WorkerPool workerPool) {
> >>>>>
> >>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
> >>>>>         this.cfgCtx = cfgCtx;
> >>>>> -        this.workerPool = workerPool;
> >>>>>     }
> >>>>>
> >>>>> -    public Application getFIXApplication(AxisService service,
> boolean
> >>>>> acceptor) {
> >>>>> +    public Application getFIXApplication(AxisService service,
> >>>>> WorkerPool workerPool, boolean acceptor) {
> >>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
> >>>>> service, acceptor);
> >>>>>     }
> >>>>>  }
> >>>>> \ No newline at end of file
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -23,6 +23,7 @@
> >>>>>  import org.apache.axis2.description.AxisService;
> >>>>>  import org.apache.axis2.description.Parameter;
> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>>>>  import org.apache.commons.logging.Log;
> >>>>>  import org.apache.commons.logging.LogFactory;
> >>>>>  import quickfix.*;
> >>>>> @@ -65,16 +66,29 @@
> >>>>>     /** A Map containing all the FIX applications created for
> >>>>> initiators, keyed by FIX EPR */
> >>>>>     private Map<String, Application> applicationStore;
> >>>>>     /** An ApplicationFactory handles creating FIX Applications
> >>>>> (FIXIncomingMessageHandler Objects) */
> >>>>> -    private FIXApplicationFactory applicationFactory;
> >>>>> +    private static FIXApplicationFactory applicationFactory = null;
> >>>>> +
> >>>>> +    private WorkerPool listenerThreadPool;
> >>>>> +    private WorkerPool senderThreadPool;
> >>>>>
> >>>>>     private Log log;
> >>>>>
> >>>>> -    public FIXSessionFactory(FIXApplicationFactory
> applicationFactory)
> >>>>> {
> >>>>> -        this.applicationFactory = applicationFactory;
> >>>>> +    private static FIXSessionFactory INSTANCE = new
> >>>>> FIXSessionFactory();
> >>>>> +
> >>>>> +    public static FIXSessionFactory
> getInstance(FIXApplicationFactory
> >>>>> af) {
> >>>>> +        if (applicationFactory == null) {
> >>>>> +            applicationFactory = af;
> >>>>> +        }
> >>>>> +        return INSTANCE;
> >>>>> +    }
> >>>>> +
> >>>>> +    private FIXSessionFactory() {
> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
> >>>>>         this.initiatorStore = new HashMap<String, Initiator>();
> >>>>>         this.applicationStore = new HashMap<String, Application>();
> >>>>> +        this.listenerThreadPool = null;
> >>>>> +        this.senderThreadPool = null;
> >>>>>     }
> >>>>>
> >>>>>     /**
> >>>>> @@ -101,7 +115,7 @@
> >>>>>                 MessageFactory messageFactory = new
> >>>>> DefaultMessageFactory();
> >>>>>                 quickfix.LogFactory logFactory =
> getLogFactory(service,
> >>>>> settings, true);
> >>>>>                 //Get a new FIX Application
> >>>>> -                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, true);
> >>>>> +                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, listenerThreadPool,
> true);
> >>>>>                 //Create a new FIX Acceptor
> >>>>>                 Acceptor acceptor = new SocketAcceptor(
> >>>>>                         messageHandler,
> >>>>> @@ -174,7 +188,7 @@
> >>>>>         MessageStoreFactory storeFactory =
> >>>>> getMessageStoreFactory(service, settings, false);
> >>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
> >>>>>         //Get a new FIX application
> >>>>> -        Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, false);
> >>>>> +        Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> false);
> >>>>>
> >>>>>         try {
> >>>>>            //Create a new FIX initiator
> >>>>> @@ -216,7 +230,7 @@
> >>>>>                 MessageFactory messageFactory = new
> >>>>> DefaultMessageFactory();
> >>>>>                 quickfix.LogFactory logFactory =
> getLogFactory(service,
> >>>>> settings, true);
> >>>>>                 //Get a new FIX Application
> >>>>> -                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, false);
> >>>>> +                Application messageHandler =
> >>>>> applicationFactory.getFIXApplication(service, senderThreadPool,
> false);
> >>>>>
> >>>>>                 Initiator initiator = new SocketInitiator(
> >>>>>                     messageHandler,
> >>>>> @@ -246,10 +260,10 @@
> >>>>>             }
> >>>>>
> >>>>>         } else {
> >>>>> -            String msg = "The " +
> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>>>> -                    "not specified. Unable to initialize the
> initiator
> >>>>> session at this stage.";
> >>>>> -            log.info(msg);
> >>>>> -            throw new AxisFault(msg);
> >>>>> +            // FIX initiator session is not configured
> >>>>> +            // It could be intentional - So not an error (we don't
> >>>>> need initiators at all times)
> >>>>> +            log.info("The " +
> >>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> >>>>> +                    "not specified. Unable to initialize the
> initiator
> >>>>> session at this stage.");
> >>>>>         }
> >>>>>     }
> >>>>>
> >>>>> @@ -276,6 +290,24 @@
> >>>>>     }
> >>>>>
> >>>>>     /**
> >>>>> +     * Stops all the FIX initiators created so far and cleans up
all
> >>>>> the mappings
> >>>>> +     * related to them
> >>>>> +     */
> >>>>> +    public void disposeFIXInitiators() {
> >>>>> +        boolean debugEnabled = log.isDebugEnabled();
> >>>>> +
> >>>>> +        for (String key : initiatorStore.keySet()) {
> >>>>> +            initiatorStore.get(key).stop();
> >>>>> +            if (debugEnabled) {
> >>>>> +                log.debug("FIX initiator to the EPR " + key + "
> >>>>> stopped");
> >>>>> +            }
> >>>>> +        }
> >>>>> +
> >>>>> +        initiatorStore.clear();
> >>>>> +        applicationStore.clear();
> >>>>> +    }
> >>>>> +
> >>>>> +    /**
> >>>>>      * Returns an array of Strings representing EPRs for the
> specified
> >>>>> service
> >>>>>      *
> >>>>>      * @param serviceName the name of the service
> >>>>> @@ -444,6 +476,14 @@
> >>>>>         }
> >>>>>         return app;
> >>>>>     }
> >>>>> +
> >>>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool)
> {
> >>>>> +        this.listenerThreadPool = listenerThreadPool;
> >>>>> +    }
> >>>>> +
> >>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool)
{
> >>>>> +        this.senderThreadPool = senderThreadPool;
> >>>>> +    }
> >>>>>  }
> >>>>>
> >>>>>
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -60,14 +60,8 @@
> >>>>>                      TransportInDescription trpInDesc) throws
> AxisFault
> >>>>> {
> >>>>>
> >>>>>         super.init(cfgCtx, trpInDesc);
> >>>>> -        //initialize the FIXSessionFactory
> >>>>> -        fixSessionFactory = new FIXSessionFactory(
> >>>>> -                new FIXApplicationFactory(this.cfgCtx,
> >>>>> this.workerPool));
> >>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
> >>>>> -
> >>>>>
>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
> >>>>> -        if (sender != null) {
> >>>>> -            sender.setSessionFactory(fixSessionFactory);
> >>>>> -        }
> >>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
> >>>>> FIXApplicationFactory(cfgCtx));
> >>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
> >>>>>         log.info("FIX transport listener initialized...");
> >>>>>     }
> >>>>>
> >>>>>
> >>>>> Modified:
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> URL:
> >>>>>
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
> >>>>>
> >>>>>
> ==============================================================================
> >>>>> ---
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> (original)
> >>>>> +++
> >>>>>
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> >>>>> Tue May 12 08:00:27 2009
> >>>>> @@ -28,6 +28,8 @@
> >>>>>  import org.apache.axis2.transport.OutTransportInfo;
> >>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
> >>>>>  import org.apache.axis2.transport.base.BaseUtils;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
> >>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
> >>>>>  import org.apache.commons.logging.LogFactory;
> >>>>>  import quickfix.*;
> >>>>>  import quickfix.field.*;
> >>>>> @@ -51,17 +53,12 @@
> >>>>>
> >>>>>     private FIXSessionFactory sessionFactory;
> >>>>>     private FIXOutgoingMessageHandler messageSender;
> >>>>> +    private WorkerPool workerPool;
> >>>>>
> >>>>>     public FIXTransportSender() {
> >>>>>         this.log = LogFactory.getLog(this.getClass());
> >>>>>     }
> >>>>>
> >>>>> -
> >>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory)
> {
> >>>>> -        this.sessionFactory = sessionFactory;
> >>>>> -        this.messageSender.setSessionFactory(sessionFactory);
> >>>>> -    }
> >>>>> -
> >>>>>     /**
> >>>>>      * @param cfgCtx       the axis2 configuration context
> >>>>>      * @param transportOut the Out Transport description
> >>>>> @@ -69,10 +66,25 @@
> >>>>>      */
> >>>>>     public void init(ConfigurationContext cfgCtx,
> >>>>> TransportOutDescription transportOut) throws AxisFault {
> >>>>>         super.init(cfgCtx, transportOut);
> >>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
> >>>>> FIXApplicationFactory(cfgCtx));
> >>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
> >>>>> +                            10, 20, 5, -1, "FIX Sender Worker thread
> >>>>> group", "FIX-Worker");
> >>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
> >>>>>         messageSender = new FIXOutgoingMessageHandler();
> >>>>> +        messageSender.setSessionFactory(this.sessionFactory);
> >>>>>         log.info("FIX transport sender initialized...");
> >>>>>     }
> >>>>>
> >>>>> +    public void stop() {
> >>>>> +        try {
> >>>>> +            this.workerPool.shutdown(10000);
> >>>>> +        } catch (InterruptedException e) {
> >>>>> +            log.warn("Thread interrupted while waiting for worker
> pool
> >>>>> to shut down");
> >>>>> +        }
> >>>>> +        sessionFactory.disposeFIXInitiators();
> >>>>> +        super.stop();
> >>>>> +    }
> >>>>> +
> >>>>>     /**
> >>>>>      * Performs the actual sending of the message.
> >>>>>      *
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Ruwan Linton
> >>>> Senior Software Engineer & Product Manager; WSO2 ESB;
> >>>> http://wso2.org/esb
> >>>> WSO2 Inc.; http://wso2.org
> >>>> email: ruwan@wso2.com; cell: +94 77 341 3097
> >>>> blog: http://ruwansblog.blogspot.com
> >>>
> >>>
> >>>
> >>> --
> >>> Hiranya Jayathilaka
> >>> Software Engineer;
> >>> WSO2 Inc.;  http://wso2.org
> >>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> >>> Blog: http://techfeast-hiranya.blogspot.com
> >>
> >>
> >>
> >> --
> >> Ruwan Linton
> >> Senior Software Engineer & Product Manager; WSO2 ESB;
> http://wso2.org/esb
> >> WSO2 Inc.; http://wso2.org
> >> email: ruwan@wso2.com; cell: +94 77 341 3097
> >> blog: http://ruwansblog.blogspot.com
> >
> >
> >
> > --
> > Hiranya Jayathilaka
> > Software Engineer;
> > WSO2 Inc.;  http://wso2.org
> > E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> > Blog: http://techfeast-hiranya.blogspot.com
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
> For additional commands, e-mail: dev-help@synapse.apache.org
>
>


-- 
Hiranya Jayathilaka
Software Engineer;
WSO2 Inc.;  http://wso2.org
E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
Blog: http://techfeast-hiranya.blogspot.com

Mime
View raw message