activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From activemqnewbie <acqua...@yahoo.co.in>
Subject Re: Activemq.xml configuration settings directory
Date Tue, 22 Jan 2008 16:57:26 GMT

Below is the source for MessageConsumer.java


package ss.integration.util;

import javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.MessageListener;
import javax.jms.JMSException;

import ss.util.logging.Logger;
import ss.util.SystemException;
import ss.util.JNDILookup;

import ss.integration.util.ExListener;

public class MessageConsumer
{

    private TopicConnectionFactory _topicConnectionFactory;
    private TopicConnection _topicConnection;
    private Topic _topic;
    private TopicSession _session;
    private TopicSubscriber _subscriber;

    private String _strClientID;
    private String _strTopicConnectionFactory;
    private String _strTopic;
    private String _strSelector;
    private String _strContext;
    private String _strURL;

    private int _reconnectRetry;
    private long _reconnectTime;
    private boolean _firstTimeConnection;

    // flag to indicated that a shutdown has been requested
    private boolean _shutdownRequested;

    private static ExListener _exListener;
    private static MessageListener _msgListener;


    public MessageConsumer(String clientID_,
                           String topicConnectionFactory_,
                           String topic_,
                           String selector_,
                           String context_,
                           String url_) throws SystemException
    {
        _strClientID = clientID_;
        _strTopicConnectionFactory = topicConnectionFactory_;
        _strTopic = topic_;
        _strSelector = selector_;
        _strContext = context_;
        _strURL = url_;

        _exListener = null;
        _msgListener = null;

        _reconnectRetry = 0;        // reconnect is currently not used.
        _reconnectTime = 1 * 1000;
        _firstTimeConnection = true;

        _shutdownRequested = false;
    }

    public void init() throws SystemException
    {
        try
        {
            Logger.trace("MessageConsumer, setting the topicConnection,
client: " + _strClientID);

            _topicConnectionFactory =
               
(TopicConnectionFactory)JNDILookup.lookup(_strTopicConnectionFactory,
_strContext, _strURL);

            _topicConnection =
_topicConnectionFactory.createTopicConnection();

            _topicConnection.setClientID(_strClientID);

            _topic = (Topic)JNDILookup.lookup(_strTopic, _strContext,
_strURL);

            _session = _topicConnection.createTopicSession(true, 0);

            _subscriber = _session.createDurableSubscriber(_topic,
                                                           _strClientID,
                                                           _strSelector,
                                                           true);
Logger.trace("_topic=" + _topic + " _strClientID=" + _strClientID +
"  _strSelector = " + _strSelector);
Logger.trace("_subscriber=" + _subscriber);

            if (_exListener == null)
            {
                _exListener = new ExListener(this);
            }
        }
        catch (JMSException je)
        {
            Logger.trace("MessageConsumer init() failed: JMSException");
            throw new SystemException(je);
        }
        catch (SystemException se)
        {
            Logger.trace("MessageConsumer init() failed: SystemException");
            throw new SystemException(se);
        }
        catch (Exception e)
        {
            Logger.trace("MessageConsumer init() failed: Exception");
            throw new SystemException(e);
        }
    }


    public void start() throws SystemException
    {
        synchronized (this)
        {
            while (!_shutdownRequested)
            {
                try
                {
                    startConnection();

                    Logger.trace("Wait()..." );
                    wait();
                    Logger.trace("Waiting thread awaken" );
                }
                catch (Exception e)
                {
                    Logger.error("Error in MessageConsumer.start: " +
e.getMessage());
                    throw new SystemException(e);
                }
                finally
                {
                    close();
                }
            }
        }
    }



    private void signalShutdown()
    {
        Logger.trace("Shutting down...");

        _shutdownRequested = true;

        notifyWaitingThread();
    }



    // to signal a waiting thread (waiting inside start() method)
    public synchronized void notifyWaitingThread()
    {
        unregisterListener();

        Logger.trace("Notifying/signaling a waiting thread...");
        notify();
    }


    public void setListener (MessageListener listener_)
    {
        _msgListener = listener_;
    }


    private void registerMsgListener() throws SystemException
    {
        if (_msgListener != null)
        {
            try
            {
                Logger.trace("Registering Message Listener...");

                // install asynchronous listener
                _subscriber.setMessageListener(_msgListener);
            }
            catch (JMSException je)
            {
                throw new SystemException(je);

            }
            catch (Exception e)
            {
                throw new SystemException(e);
            }
        }
        else
        {
            Logger.warning("MessageListener is null");
        }
    }


    private void unregisterListener()
    {
        try
        {
            Logger.trace("Unregistering Message Listener...");
            _subscriber.setMessageListener(null);
        }
        catch (Exception e)
        {
            Logger.error(e);
        }
    }


    private void registerExceptionListener() throws SystemException
    {
        if (_exListener != null)
        {
            try
            {
                Logger.trace("Registering JMS Exception Listener...");

                // install asynchronous listener
                _topicConnection.setExceptionListener(_exListener);
            }
            catch (JMSException je)
            {
                throw new SystemException(je);
            }
            catch (Exception e)
            {
                throw new SystemException(e);
            }
        }
        else
        {
            Logger.warning("ExceptionListener is null");
        }
    }


    public void doCommit () throws SystemException
    {
        try
        {
            _session.commit();
        }
        catch (JMSException je)
        {
            throw new SystemException(je);
        }
        catch (Exception e)
        {
            throw new SystemException(e);
        }
    }


    public void doRollback () throws SystemException
    {
        try
        {
            Logger.trace("doRollback()  Rolling-back message...");
            _session.rollback();                            // Be careful
when using this. Sometimes the rollback doesn't work.
            Logger.trace("doRollback()  Message rolled-back");

            signalShutdown();
        }
        catch (JMSException je)
        {
            Logger.trace("doRollback() JMS Exception: " + je.getMessage());
            throw new SystemException(je);
        }
        catch (Exception e)
        {
            Logger.trace("doRollback() Exception :" + e.getMessage());
            throw new SystemException(e);
        }
    }



    public void close() throws SystemException
    {
        try
        {
            if (_subscriber != null)
            {
                Logger.trace("closing subscriber...");
                _subscriber.close();
                _subscriber = null;
                Logger.trace("subscriber closed");
            }

            if (_session != null)
            {
                Logger.trace("closing session...");
                _session.close();
                _session = null;
                Logger.trace("session closed");
            }

            if (_topicConnection != null)
            {
                Logger.trace("closing topic connection...");
                _topicConnection.close();
                _topicConnection = null;
                Logger.trace("topicConnection closed");
            }

            _topicConnectionFactory = null;
            _topic = null;

        }
        catch (Exception e)
        {
            Logger.trace("Exception in close()");
        }
    }


    private void startConnection() throws SystemException
    {
        boolean success = false;

        for (int i=0; i<_reconnectRetry || _firstTimeConnection; i++)
        {
            _firstTimeConnection = false;

            try
            {
                Logger.trace("Closing all connections... ");
                close();

                if (_exListener != null)
                {
                    wait(_reconnectTime);
                }

                Logger.trace("Starting all connections... (" + (i+1) + "/" +
_reconnectRetry + ")");
                init();
                Logger.trace("Init successfull");

                registerMsgListener();
                Logger.trace("RegisterMsgListener successfull");

                registerExceptionListener();
                Logger.trace("RegisterExceptionListener successfull");

                Logger.trace("Starting TopicConnection...");
                _topicConnection.start();
                Logger.trace("TopicConnection started...");

                success = true;

                break;
            }
            catch (Exception e)
            {
                Logger.error("startConnection() failed");
                Logger.error(e);
            }
        }

        if (!success)
        {
            close();
            Logger.error("Unable to connect to the JMS Server");
            throw new SystemException("Unable to connect to the JMS
Server");
        }
    }
}


James.Strachan wrote:
> 
> The exception is being generated from the code in this package :
> ss.integration.util
> 
> I've no idea what that is - got the source?
> 
> On 22/01/2008, activemqnewbie <acquaj29@yahoo.co.in> wrote:
>>
>> We are getting NullpointerException and the consumer shuts off.
>> Consumer after receiving certain number of messages consumer
>> automatically
>> closes throwing a null pointer exception.
>> Activemq 5
>> Oracle10g
>>
>> Below is the stack trace.
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener
receives a
>> JMS exception
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering
Message
>> Listener...
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling
a
>> waiting
>> thread...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Waiting thread
awaken
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing subscriber...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> subscriber closed
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing session...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> session closed
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic
connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in
close()
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic
connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in
close()
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Unable to connect
to the JMS
>> Server
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><> ExceptionListener
receives a
>> JMS exception
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in
>> MessageConsumer.start:
>> Unable to connect to the JMS Server
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> closing topic
connection...
>>
>> <TRACE><2008/01/22 09:52:56.470><20688146><> Exception in
close()
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Unregistering
Message
>> Listener...
>> <ERROR><2008/01/22 09:52:56.470><20688146><> Error in main:
Unable to
>> connect to the JMS Server
>>
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><> StackTrace:
>>
>> <ERROR><2008/01/22 09:52:56.470><20688146><>
>> Error Code: -1
>> k12.util.SystemException: Unable to connect to the JMS Server
>>         at
>> ss.integration.util.MessageConsumer.startConnection(MessageConsumer.java:368)
>>         at
>> ss.integration.util.MessageConsumer.start(MessageConsumer.java:124)
>>         at
>> ss.integration.agent.sams.SInputStarter.start(SInputStarter.java:113)
>>         at
>> ss.integration.agent.sams.SInputStarter.main(SInputStarter.java:45)
>>
>> <ERROR><2008/01/22 09:52:56.470><23459640><>
>> java.lang.NullPointerException
>>         at
>> ss.integration.util.MessageConsumer.unregisterListener(MessageConsumer.java:205)
>>         at
>> ss.integration.util.MessageConsumer.notifyWaitingThread(MessageConsumer.java:159)
>>         at ss.integration.util.ExListener.onException(ExListener.java:26)
>>         at
>> org.apache.activemq.ActiveMQConnection$3.run(ActiveMQConnection.java:1648)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>>         at java.lang.Thread.run(Thread.java:595)
>>
>>
>>
>> <TRACE><2008/01/22 09:52:56.470><23459640><> Notifying/signaling
a
>> waiting
>> thread...
>>
>>
>>
>> Thanks for the help,
>> Vij
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15020252.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> James
> -------
> http://macstrac.blogspot.com/
> 
> Open Source Integration
> http://open.iona.com
> 
> 

-- 
View this message in context: http://www.nabble.com/Activemq.xml-configuration-settings-directory-tp14957812s2354p15023059.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message