synapse-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ruwan Linton <ruwan.lin...@gmail.com>
Subject Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java
Date Tue, 12 May 2009 19:13:34 GMT
Hiranya,

If you can make the worker pool configurable that would be of much
importance... you may have a look at the nhttp transport thread pool, which
can be configurable via the nhttp.properties file.

Thanks,
Ruwan

On Tue, May 12, 2009 at 1:30 PM, <hiranya@apache.org> wrote:

> Author: hiranya
> Date: Tue May 12 08:00:27 2009
> New Revision: 773818
>
> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
> Log:
> Enhancements and code cleanup in the FIX transport:
> * FIX sender now has its own worker pool and hence does not rely on the FIX
> listener any more. Therefore listener and sender can be enabled individually
> * Made FIXSessionFactory a singleton to effectively share session data
> among the listener and the sender
> * Cleanup logic for initiators during sender shutdown
> * Minor bug fix at FIXSessionFactory for a bug which prevented the sample
> 259 and similar scenarios from operating properly
>
> Modified:
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>
>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
> Tue May 12 08:00:27 2009
> @@ -27,15 +27,12 @@
>  public class FIXApplicationFactory {
>
>     private ConfigurationContext cfgCtx;
> -    private WorkerPool workerPool;
> -
> -    public FIXApplicationFactory(ConfigurationContext cfgCtx, WorkerPool
> workerPool) {
>
> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>         this.cfgCtx = cfgCtx;
> -        this.workerPool = workerPool;
>     }
>
> -    public Application getFIXApplication(AxisService service, boolean
> acceptor) {
> +    public Application getFIXApplication(AxisService service, WorkerPool
> workerPool, boolean acceptor) {
>         return new FIXIncomingMessageHandler(cfgCtx, workerPool, service,
> acceptor);
>     }
>  }
> \ No newline at end of file
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
> Tue May 12 08:00:27 2009
> @@ -23,6 +23,7 @@
>  import org.apache.axis2.description.AxisService;
>  import org.apache.axis2.description.Parameter;
>  import org.apache.axis2.transport.base.BaseUtils;
> +import org.apache.axis2.transport.base.threads.WorkerPool;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  import quickfix.*;
> @@ -65,16 +66,29 @@
>     /** A Map containing all the FIX applications created for initiators,
> keyed by FIX EPR */
>     private Map<String, Application> applicationStore;
>     /** An ApplicationFactory handles creating FIX Applications
> (FIXIncomingMessageHandler Objects) */
> -    private FIXApplicationFactory applicationFactory;
> +    private static FIXApplicationFactory applicationFactory = null;
> +
> +    private WorkerPool listenerThreadPool;
> +    private WorkerPool senderThreadPool;
>
>     private Log log;
>
> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory) {
> -        this.applicationFactory = applicationFactory;
> +    private static FIXSessionFactory INSTANCE = new FIXSessionFactory();
> +
> +    public static FIXSessionFactory getInstance(FIXApplicationFactory af)
> {
> +        if (applicationFactory == null) {
> +            applicationFactory = af;
> +        }
> +        return INSTANCE;
> +    }
> +
> +    private FIXSessionFactory() {
>         this.log = LogFactory.getLog(this.getClass());
>         this.acceptorStore = new HashMap<String,Acceptor>();
>         this.initiatorStore = new HashMap<String, Initiator>();
>         this.applicationStore = new HashMap<String, Application>();
> +        this.listenerThreadPool = null;
> +        this.senderThreadPool = null;
>     }
>
>     /**
> @@ -101,7 +115,7 @@
>                 MessageFactory messageFactory = new
> DefaultMessageFactory();
>                 quickfix.LogFactory logFactory = getLogFactory(service,
> settings, true);
>                 //Get a new FIX Application
> -                Application messageHandler =
> applicationFactory.getFIXApplication(service, true);
> +                Application messageHandler =
> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>                 //Create a new FIX Acceptor
>                 Acceptor acceptor = new SocketAcceptor(
>                         messageHandler,
> @@ -174,7 +188,7 @@
>         MessageStoreFactory storeFactory = getMessageStoreFactory(service,
> settings, false);
>         MessageFactory messageFactory = new DefaultMessageFactory();
>         //Get a new FIX application
> -        Application messageHandler =
> applicationFactory.getFIXApplication(service, false);
> +        Application messageHandler =
> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>
>         try {
>            //Create a new FIX initiator
> @@ -216,7 +230,7 @@
>                 MessageFactory messageFactory = new
> DefaultMessageFactory();
>                 quickfix.LogFactory logFactory = getLogFactory(service,
> settings, true);
>                 //Get a new FIX Application
> -                Application messageHandler =
> applicationFactory.getFIXApplication(service, false);
> +                Application messageHandler =
> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>
>                 Initiator initiator = new SocketInitiator(
>                     messageHandler,
> @@ -246,10 +260,10 @@
>             }
>
>         } else {
> -            String msg = "The " +
> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
> -                    "not specified. Unable to initialize the initiator
> session at this stage.";
> -            log.info(msg);
> -            throw new AxisFault(msg);
> +            // FIX initiator session is not configured
> +            // It could be intentional - So not an error (we don't need
> initiators at all times)
> +            log.info("The " + FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM
> + " parameter is " +
> +                    "not specified. Unable to initialize the initiator
> session at this stage.");
>         }
>     }
>
> @@ -276,6 +290,24 @@
>     }
>
>     /**
> +     * Stops all the FIX initiators created so far and cleans up all the
> mappings
> +     * related to them
> +     */
> +    public void disposeFIXInitiators() {
> +        boolean debugEnabled = log.isDebugEnabled();
> +
> +        for (String key : initiatorStore.keySet()) {
> +            initiatorStore.get(key).stop();
> +            if (debugEnabled) {
> +                log.debug("FIX initiator to the EPR " + key + " stopped");
> +            }
> +        }
> +
> +        initiatorStore.clear();
> +        applicationStore.clear();
> +    }
> +
> +    /**
>      * Returns an array of Strings representing EPRs for the specified
> service
>      *
>      * @param serviceName the name of the service
> @@ -444,6 +476,14 @@
>         }
>         return app;
>     }
> +
> +    public void setListenerThreadPool(WorkerPool listenerThreadPool) {
> +        this.listenerThreadPool = listenerThreadPool;
> +    }
> +
> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
> +        this.senderThreadPool = senderThreadPool;
> +    }
>  }
>
>
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
> Tue May 12 08:00:27 2009
> @@ -60,14 +60,8 @@
>                      TransportInDescription trpInDesc) throws AxisFault {
>
>         super.init(cfgCtx, trpInDesc);
> -        //initialize the FIXSessionFactory
> -        fixSessionFactory = new FIXSessionFactory(
> -                new FIXApplicationFactory(this.cfgCtx, this.workerPool));
> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
> -
>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
> -        if (sender != null) {
> -            sender.setSessionFactory(fixSessionFactory);
> -        }
> +        fixSessionFactory = FIXSessionFactory.getInstance(new
> FIXApplicationFactory(cfgCtx));
> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>         log.info("FIX transport listener initialized...");
>     }
>
>
> Modified:
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> URL:
> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>
> ==============================================================================
> ---
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> (original)
> +++
> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
> Tue May 12 08:00:27 2009
> @@ -28,6 +28,8 @@
>  import org.apache.axis2.transport.OutTransportInfo;
>  import org.apache.axis2.transport.base.AbstractTransportSender;
>  import org.apache.axis2.transport.base.BaseUtils;
> +import org.apache.axis2.transport.base.threads.WorkerPool;
> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>  import org.apache.commons.logging.LogFactory;
>  import quickfix.*;
>  import quickfix.field.*;
> @@ -51,17 +53,12 @@
>
>     private FIXSessionFactory sessionFactory;
>     private FIXOutgoingMessageHandler messageSender;
> +    private WorkerPool workerPool;
>
>     public FIXTransportSender() {
>         this.log = LogFactory.getLog(this.getClass());
>     }
>
> -
> -    public void setSessionFactory(FIXSessionFactory sessionFactory) {
> -        this.sessionFactory = sessionFactory;
> -        this.messageSender.setSessionFactory(sessionFactory);
> -    }
> -
>     /**
>      * @param cfgCtx       the axis2 configuration context
>      * @param transportOut the Out Transport description
> @@ -69,10 +66,25 @@
>      */
>     public void init(ConfigurationContext cfgCtx, TransportOutDescription
> transportOut) throws AxisFault {
>         super.init(cfgCtx, transportOut);
> +        this.sessionFactory = FIXSessionFactory.getInstance(new
> FIXApplicationFactory(cfgCtx));
> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
> +                            10, 20, 5, -1, "FIX Sender Worker thread
> group", "FIX-Worker");
> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>         messageSender = new FIXOutgoingMessageHandler();
> +        messageSender.setSessionFactory(this.sessionFactory);
>         log.info("FIX transport sender initialized...");
>     }
>
> +    public void stop() {
> +        try {
> +            this.workerPool.shutdown(10000);
> +        } catch (InterruptedException e) {
> +            log.warn("Thread interrupted while waiting for worker pool to
> shut down");
> +        }
> +        sessionFactory.disposeFIXInitiators();
> +        super.stop();
> +    }
> +
>     /**
>      * Performs the actual sending of the message.
>      *
>
>
>


-- 
Ruwan Linton
Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
WSO2 Inc.; http://wso2.org
email: ruwan@wso2.com; cell: +94 77 341 3097
blog: http://ruwansblog.blogspot.com

Mime
View raw message