activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Meeraj Kunnumpurath <mkunnumpur...@googlemail.com>
Subject Connection Consumer and Server Session Pools
Date Sat, 15 Sep 2007 00:59:41 GMT

Hi,

I am trying to write a standalone JMS host that can transactionally receive
messages from ActiveMQ using JTA. I have implemented ServerSessionPool and
ServerSession and created a ConnectionConsumer using the JS Connection by
passing the instance os the ServerSessionPool implementation. However, I am
unable to receive any messages despite starting the connection (the start
method on the ServerSession implementation doesnt seem to get called). 

Any pointers would be highly appreciated,

1. TransactionHandler

public interface TransactionHandler {    
    void begin(Session session) throws JmsTxException;    
    void commit(Session session) throws JmsTxException;    
    void rollback(Session session) throws JmsTxException;
}

2. JtaTransactionHandler

public class JtaTransactionHandler implements TransactionHandler {
    
    private TransactionManager transactionManager;
    
    public void setTransactionManager(TransactionManager transactionManager)
{
        this.transactionManager = transactionManager;
    }

    public void begin(Session session) throws JmsTxException {       
        try {            
            transactionManager.begin();            
            if(!(session instanceof XASession)) {
                throw new JmsTxException("XA session required for global
transactions");
            }            
            XASession xaSession = (XASession) session;
            XAResource xaResource = xaSession.getXAResource();            
            transactionManager.getTransaction().enlistResource(xaResource);
        } catch (Exception e) {
            throw new JmsTxException(e);
        }        
    }

    public void commit(Session session) throws JmsTxException {        
        try {
            transactionManager.commit();
        } catch (Exception e) {
            throw new JmsTxException(e);
        }
    }

    public void rollback(Session session) throws JmsTxException {        
        try {
            transactionManager.rollback();
        } catch (Exception e) {
            throw new JmsTxException(e);
        }        
    }

}

3. ServerSession implementation

public class StandaloneServerSession implements ServerSession {
    
    private StandaloneServerSessionPool serverSessionPool;
    private Session session;
    private TransactionHandler transactionHandler;
    
    
    public StandaloneServerSession(Session session, 
                                                StandaloneServerSessionPool
serverSessionPool, 
                                                TransactionHandler
transactionHandler) {
        this.session = session;
        this.serverSessionPool = serverSessionPool;
        this.transactionHandler = transactionHandler;
    }

    public Session getSession() throws JMSException {
        return session;
    }

    public void start() throws JMSException {
        try {
            transactionHandler.begin(session);
            session.run();
            transactionHandler.commit(session);
        } catch(RuntimeException ex) {
            transactionHandler.rollback(session);
            throw ex;
        } finally {
            serverSessionPool.returnSession(this);
        }
    }

}

4. ServerSessionPool implementation

public class StandaloneServerSessionPool implements ServerSessionPool {
    
    // Available server sessions
    private Stack<ServerSession> serverSessions = new
Stack<ServerSession>();
    
    /**
     * Initializes the server sessions.
     * @param serverSessions Server sessions.
     */
    public StandaloneServerSessionPool(List<Session> sessions, 
                                                     TransactionHandler
transactionHandler) {
        for(Session session : sessions) {
            ServerSession serverSession = 
                new StandaloneServerSession(session, this,
transactionHandler);
            this.serverSessions.push(serverSession);
        }
    }
    
    public void stop() throws JMSException {
        ServerSession serverSession = null;
        while((serverSession = getServerSession()) != null) {
            serverSession.getSession().close();
        }
    }

    public ServerSession getServerSession() throws JMSException {        
        synchronized (serverSessions) {            
            while(serverSessions.isEmpty()) {
                try {
                    serverSessions.wait();
                } catch (InterruptedException e) {
                    throw new JMSException("Unable to get a server
session");
                }
            }
            return serverSessions.pop();            
        }        
    }
    
    protected void returnSession(ServerSession serverSession) {        
        synchronized (serverSessions) {
            serverSessions.push(serverSession);
            serverSessions.notify();
        }        
    }

}

6. Code to create te connection consumer

    public void registerListener(Destination destination, 
                                 ConnectionFactory connectionFactory, 
                                 List<MessageListener> listeners, 
                                 TransactionType transactionType) {
        
        try {

            connection = connectionFactory.createConnection();
            List<Session> sessions = new LinkedList<Session>();
            for (MessageListener listener : listeners) {
                boolean transacted = transactionType !=
TransactionType.GLOBAL;
                Session session = connection.createSession(transacted,
Session.SESSION_TRANSACTED);
                session.setMessageListener(listener);
                sessions.add(session);
            }
            
            TransactionHandler transactionHandler =
transactionHandlers.get(transactionType);
            StandaloneServerSessionPool serverSessionPool = 
                new StandaloneServerSessionPool(sessions,
transactionHandler);
            
            ConnectionConsumer connectionConsumer = 
                connection.createConnectionConsumer(destination, null,
serverSessionPool, 1);            
            connection.start();
            
        } catch (JMSException ex) {
            throw new RuntimeException("Unable to activate service", ex);
        }
        
    }

Kind Regards
Meeraj

-- 
View this message in context: http://www.nabble.com/Connection-Consumer-and-Server-Session-Pools-tf4446061s2354.html#a12686031
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message