synapse-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hiranya Jayathilaka <hiranya...@gmail.com>
Subject Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java
Date Wed, 13 May 2009 04:36:36 GMT
Hi Ruwan,

On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ruwan.linton@gmail.com>wrote:

> Hiranya,
>
> If you can make the worker pool configurable that would be of much
> importance... you may have a look at the nhttp transport thread pool, which
> can be configurable via the nhttp.properties file.


Currently the FIX sender initializes the WorkerPool in a manner similar to
the AbstractTransportListener. The WorkerPool in AbstractTransportListener
is used by several transports (JMS, Mail etc) via inheritance. FIX listener
also makes use of the same thread pool. Do we have any plans to make that
thread pool configurable too? Othrewise I don't think it mkes much sense
just to make the FIX sender's thread pool configurable.

Thanks,
Hiranya


>
> Thanks,
> Ruwan
>
> On Tue, May 12, 2009 at 1:30 PM, <hiranya@apache.org> wrote:
>
>> Author: hiranya
>> Date: Tue May 12 08:00:27 2009
>> New Revision: 773818
>>
>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>> Log:
>> Enhancements and code cleanup in the FIX transport:
>> * FIX sender now has its own worker pool and hence does not rely on the
>> FIX listener any more. Therefore listener and sender can be enabled
>> individually
>> * Made FIXSessionFactory a singleton to effectively share session data
>> among the listener and the sender
>> * Cleanup logic for initiators during sender shutdown
>> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
>> 259 and similar scenarios from operating properly
>>
>> Modified:
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>
>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>> Tue May 12 08:00:27 2009
>> @@ -27,15 +27,12 @@
>>  public class FIXApplicationFactory {
>>
>>     private ConfigurationContext cfgCtx;
>> -    private WorkerPool workerPool;
>> -
>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
>> workerPool) {
>>
>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>         this.cfgCtx = cfgCtx;
>> -        this.workerPool = workerPool;
>>     }
>>
>> -    public Application getFIXApplication(AxisService service, boolean
>> acceptor) {
>> +    public Application getFIXApplication(AxisService service, WorkerPool
>> workerPool, boolean acceptor) {
>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
>> acceptor);
>>     }
>>  }
>> \ No newline at end of file
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>> Tue May 12 08:00:27 2009
>> @@ -23,6 +23,7 @@
>>  import org.apache.axis2.description.AxisService;
>>  import org.apache.axis2.description.Parameter;
>>  import org.apache.axis2.transport.base.BaseUtils;
>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>  import org.apache.commons.logging.Log;
>>  import org.apache.commons.logging.LogFactory;
>>  import quickfix.*;
>> @@ -65,16 +66,29 @@
>>     /** A Map containing all the FIX applications created for initiators,
>> keyed by FIX EPR */
>>     private Map<String, Application> applicationStore;
>>     /** An ApplicationFactory handles creating FIX Applications
>> (FIXIncomingMessageHandler Objects) */
>> -    private FIXApplicationFactory applicationFactory;
>> +    private static FIXApplicationFactory applicationFactory = null;
>> +
>> +    private WorkerPool listenerThreadPool;
>> +    private WorkerPool senderThreadPool;
>>
>>     private Log log;
>>
>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
>> -        this.applicationFactory = applicationFactory;
>> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
>> +
>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory af)
>> {
>> +        if (applicationFactory == null) {
>> +            applicationFactory = af;
>> +        }
>> +        return INSTANCE;
>> +    }
>> +
>> +    private FIXSessionFactory() {
>>         this.log = LogFactory.getLog(this.getClass());
>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>         this.initiatorStore = new HashMap<String, Initiator>();
>>         this.applicationStore = new HashMap<String, Application>();
>> +        this.listenerThreadPool = null;
>> +        this.senderThreadPool = null;
>>     }
>>
>>     /**
>> @@ -101,7 +115,7 @@
>>                 MessageFactory messageFactory = new
>> DefaultMessageFactory();
>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>> settings, true);
>>                 //Get a new FIX Application
>> -                Application messageHandler =
>> applicationFactory.getFIXApplication(service, true);
>> +                Application messageHandler =
>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>                 //Create a new FIX Acceptor
>>                 Acceptor acceptor = new SocketAcceptor(
>>                         messageHandler,
>> @@ -174,7 +188,7 @@
>>         MessageStoreFactory storeFactory = getMessageStoreFactory(service,
>> settings, false);
>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>         //Get a new FIX application
>> -        Application messageHandler =
>> applicationFactory.getFIXApplication(service, false);
>> +        Application messageHandler =
>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>
>>         try {
>>            //Create a new FIX initiator
>> @@ -216,7 +230,7 @@
>>                 MessageFactory messageFactory = new
>> DefaultMessageFactory();
>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>> settings, true);
>>                 //Get a new FIX Application
>> -                Application messageHandler =
>> applicationFactory.getFIXApplication(service, false);
>> +                Application messageHandler =
>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>
>>                 Initiator initiator = new SocketInitiator(
>>                     messageHandler,
>> @@ -246,10 +260,10 @@
>>             }
>>
>>         } else {
>> -            String msg = "The " +
>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> -                    "not specified. Unable to initialize the initiator
>> session at this stage.";
>> -            log.info(msg);
>> -            throw new AxisFault(msg);
>> +            // FIX initiator session is not configured
>> +            // It could be intentional - So not an error (we don't need
>> initiators at all times)
>> +            log.info("The " +
>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>> +                    "not specified. Unable to initialize the initiator
>> session at this stage.");
>>         }
>>     }
>>
>> @@ -276,6 +290,24 @@
>>     }
>>
>>     /**
>> +     * Stops all the FIX initiators created so far and cleans up all the
>> mappings
>> +     * related to them
>> +     */
>> +    public void disposeFIXInitiators() {
>> +        boolean debugEnabled = log.isDebugEnabled();
>> +
>> +        for (String key : initiatorStore.keySet()) {
>> +            initiatorStore.get(key).stop();
>> +            if (debugEnabled) {
>> +                log.debug("FIX initiator to the EPR " + key + "
>> stopped");
>> +            }
>> +        }
>> +
>> +        initiatorStore.clear();
>> +        applicationStore.clear();
>> +    }
>> +
>> +    /**
>>      * Returns an array of Strings representing EPRs for the specified
>> service
>>      *
>>      * @param serviceName the name of the service
>> @@ -444,6 +476,14 @@
>>         }
>>         return app;
>>     }
>> +
>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
>> +        this.listenerThreadPool = listenerThreadPool;
>> +    }
>> +
>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>> +        this.senderThreadPool = senderThreadPool;
>> +    }
>>  }
>>
>>
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>> Tue May 12 08:00:27 2009
>> @@ -60,14 +60,8 @@
>>                      TransportInDescription trpInDesc) throws AxisFault {
>>
>>         super.init(cfgCtx, trpInDesc);
>> -        //initialize the FIXSessionFactory
>> -        fixSessionFactory = new FIXSessionFactory(
>> -                new FIXApplicationFactory(this.cfgCtx, this.workerPool));
>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>> -
>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>> -        if (sender != null) {
>> -            sender.setSessionFactory(fixSessionFactory);
>> -        }
>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>> FIXApplicationFactory(cfgCtx));
>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>         log.info("FIX transport listener initialized...");
>>     }
>>
>>
>> Modified:
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>> Tue May 12 08:00:27 2009
>> @@ -28,6 +28,8 @@
>>  import org.apache.axis2.transport.OutTransportInfo;
>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>  import org.apache.axis2.transport.base.BaseUtils;
>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>  import org.apache.commons.logging.LogFactory;
>>  import quickfix.*;
>>  import quickfix.field.*;
>> @@ -51,17 +53,12 @@
>>
>>     private FIXSessionFactory sessionFactory;
>>     private FIXOutgoingMessageHandler messageSender;
>> +    private WorkerPool workerPool;
>>
>>     public FIXTransportSender() {
>>         this.log = LogFactory.getLog(this.getClass());
>>     }
>>
>> -
>> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
>> -        this.sessionFactory = sessionFactory;
>> -        this.messageSender.setSessionFactory(sessionFactory);
>> -    }
>> -
>>     /**
>>      * @param cfgCtx       the axis2 configuration context
>>      * @param transportOut the Out Transport description
>> @@ -69,10 +66,25 @@
>>      */
>>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
>> transportOut) throws AxisFault {
>>         super.init(cfgCtx, transportOut);
>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>> FIXApplicationFactory(cfgCtx));
>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>> +                            10, 20, 5, -1, "FIX Sender Worker thread
>> group", "FIX-Worker");
>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>         messageSender = new FIXOutgoingMessageHandler();
>> +        messageSender.setSessionFactory(this.sessionFactory);
>>         log.info("FIX transport sender initialized...");
>>     }
>>
>> +    public void stop() {
>> +        try {
>> +            this.workerPool.shutdown(10000);
>> +        } catch (InterruptedException e) {
>> +            log.warn("Thread interrupted while waiting for worker pool to
>> shut down");
>> +        }
>> +        sessionFactory.disposeFIXInitiators();
>> +        super.stop();
>> +    }
>> +
>>     /**
>>      * Performs the actual sending of the message.
>>      *
>>
>>
>>
>
>
> --
> Ruwan Linton
> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
> WSO2 Inc.; http://wso2.org
> email: ruwan@wso2.com; cell: +94 77 341 3097
> blog: http://ruwansblog.blogspot.com
>



-- 
Hiranya Jayathilaka
Software Engineer;
WSO2 Inc.;  http://wso2.org
E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
Blog: http://techfeast-hiranya.blogspot.com

Mime
View raw message