activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tarau <...@daxtechnologies.com>
Subject Re: Session closed after a while(random)
Date Mon, 17 Jul 2006 21:40:02 GMT
I think I found the cause???? :)

Look at this stack trace(which seems to be the source of our problems, 
after this exceptions we got "Session close"), which at some moment in 
time I got it(see bellow -  I will mark with bold the what's important). 
Look at the bold stack entry, in ActiveMQConnection.onException.
Un unrecoverable error in transport close the sessions, consumers, 
listeners , etc but doesn't close the connection.

My question is how can I avoid getting the "Session close" message? How 
can I test if the session is closed? It is this the right way to handle 
a transport error, cleaning up the connection?

I cannot test if the session is closed(ActiveMQSession.closed field is 
private), eventually I can try to do something with the session and in 
case of failure, I will recreate the session. Is this the right way to 
do it?

Thanks.

/**
     * An unrecoverable exception has occured on the transport
     * @param error
     */
public void onException(final IOException error) {
        onAsyncException(error);
        asyncConnectionThread.execute(new Runnable(){
            public void run() {
                *transportFailed(error);*
                ServiceSupport.dispose(ActiveMQConnection.this.transport);
                brokerInfoReceived.countDown();
       
                for (Iterator iter = transportListeners.iterator(); 
iter.hasNext();) {
                    TransportListener listener = (TransportListener) 
iter.next();
                    listener.onException(error);
                }
            }
        });
    }

and then transportFailed

protected void transportFailed(IOException error){
        transportFailed.set(true);
        if (firstFailureError == null) {
            firstFailureError = error;
        }
        if (!closed.get() && !closing.get()) {
            try{
                *cleanup();*
            }catch(JMSException e){
               log.warn("Cleanup failed",e);
            }
        }
    }

and then cleanup

public void cleanup() throws JMSException {
       
        if( advisoryConsumer!=null ) {
            advisoryConsumer.dispose();
            advisoryConsumer=null;
        }
       
        for (Iterator i = this.sessions.iterator(); i.hasNext();) {
            *ActiveMQSession s = (ActiveMQSession) i.next();
            s.dispose();*
        }
        for (Iterator i = this.connectionConsumers.iterator(); 
i.hasNext();) {
            ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) 
i.next();
            c.dispose();
        }
        for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
            ActiveMQInputStream c = (ActiveMQInputStream) i.next();
            c.dispose();
        }
        for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
            ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
            c.dispose();
        }

        if(isConnectionInfoSentToBroker){
            if(!transportFailed.get() && !closing.get()){
                asyncSendPacket(info.createRemoveCommand());
            }
            isConnectionInfoSentToBroker=false;
        }       
        if( userSpecifiedClientID ) {
            info.setClientId(null);
            userSpecifiedClientID=false;
        }
        clientIDSet = false;

        started.set(false);
    }

*Stack trace*

    at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45)
    at 
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)
    *at 
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478)*
    at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94)
    at 
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)
    at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)
    at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
    at 
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
    at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
    at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
    at 
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)
    at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)
    at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
    at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
    at 
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
    at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
    at 
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)
    at 
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)
    at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)
    at 
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667)
    at 
org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)
    at 
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840)
    at 
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800)
    at 
com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)
    at 
com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)
    at 
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)
    at 
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)
    at 
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)
    at 
com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)
Caused by: java.io.InterruptedIOException: Interrupted.
    at 
org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
    at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)

Adrian Tarau wrote:
> Even more :)
>
> As I told you I created a a Session wrapper over the ActiveMQ session 
> in order to catch the close call.
>
> you can see down bellow the close method :
>
> public void close() throws JMSException {
>        try {
>            IllegalAccessError illegalAccessError = new 
> IllegalAccessError("I don't want to close");
>            StringWriter sw = new StringWriter();
>            illegalAccessError.printStackTrace(new PrintWriter(sw));
>            LOG.error("Somebody closed the session, let's see who\n\n" 
> + sw.toString());
>            System.out.println("Somebody closed the session, let's see 
> who\n\n" + sw.toString());
>        } finally {
>            session.close();
>        }
>    }
>
> Well, I looked in the error log, also looked in the container 
> log(JBoss) and didn't find any of theses  strings "Somebody ...". The 
> conclusion is that nobody called close explicit on the session.
> I placed that on 3 systems and over the weekend I got on all this 
> problem. One common thing between the systems is that are all 
> loaded(are test machines) and on all everything start to get "Session 
> close" after I got this exception(see bellow).
>
> I don't know why we got java.io.InterruptedIOException:, it could be 
> possible because of me :). We interrupt the current thread(which 
> consume the message) if didn't respond after 60 secs(some kind of 
> watchdog) so it is possible to be because I call explicit 
> Thread.interrupt().
>
> Anyway, I thinkg, this should affect the sessions state(the close 
> field should not be turned false by any other piece of code, except 
> close() function, right?) should not be affected by am interrupt 
> exception in a consumer.
>
> Thats all I could get about this problem with session close, anyway I 
> think I will create all the time the session, to avoid this problem.
>
> Thanks.
>
>
> 2006-07-15 03:24:54,628 [AcitveMQ Connection Worker: vm://localhost#0] 
> ERROR com.daxtechnologies.sams.manager.SamsManagerJMS - JMS Error
> javax.jms.JMSException: Interrupted.
>    at 
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45) 
>
>    at 
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)

>
>    at 
> org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478) 
>
>    at 
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)

>
>    at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63) 
>
>    at 
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>    at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
>
>    at 
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)

>
>    at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)

>
>    at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63) 
>
>    at 
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>    at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)

>
>    at 
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131) 
>
>    at 
> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667) 
>
>    at 
> org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)

>
>    at 
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840) 
>
>    at 
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800) 
>
>    at 
> com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)

>
>    at 
> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)

>
> Caused by: java.io.InterruptedIOException: Interrupted.
>    at 
> org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)

>
>    ... 23 more
> 2006-07-15 03:24:54,805 [SAMS-Worker-queue.contact_type-3] ERROR 
> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue - Cannot 
> consume message from queue queue 'Contact Queue 
> <system/ady@daxtechnologies.com>' javax.jms.JMSException: Interrupted.
>    at 
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57) 
>
>    at 
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1141) 
>
>    at 
> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667) 
>
>    at 
> org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:516) 
>
>    at 
> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:145)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)

>
> Caused by: java.io.InterruptedIOException: Interrupted.
>    at 
> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:74)

>
>    at 
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131) 
>
>    ... 7 more
>
> Adrian Tarau wrote:
>> More about closed session :)
>>
>> Could be possible that the closing problem could occur because the 
>> Session is not per Thread, even if it is accessed by only on thread 
>> at the time(or it should be accessed, but sometime is not - 
>> backport-concurent problems)?
>>
>> The code look like that :
>> class MyStorageQueue {
>>   private  Session session;
>>   private Queue queue;
>>   protected final Semaphore lock = new Semaphore(1, true);
>>     ....
>>  public Object consume() {
>>     ....
>>  }
>>
>>  public void publish(Object object) {
>>     ....
>>  }
>> }
>>
>> and usage in different threads
>> MyStorageQueue queue = getNextQueue(...);
>> if (queue.lock.tryAcquire()) {
>>                try {
>>                       Object value = consume(queue);
>>                        .....
>>                } finally {
>>                    queue.lock.release();
>>                }
>> }
>>
>> Anyway I created a wrapper over an ActiveMQ session, and dump the 
>> stack trace to see who calls the close method.
>>
>>
>>
>> Adrian Tarau wrote:
>>> Debugging remove is not a problem, the problem is that I need to 
>>> start the application in debug mode and keep the IDE with a 
>>> breakpoint for days until it happens.
>>>
>>> I think I will recompile the ActiveMQ and dump the thread stack 
>>> trace :)
>>>
>>> Sanjiv Jivan wrote:
>>>> You can attach your debugger to the Java process on the remote 
>>>> machine and
>>>> set the breakpoint on the session.close() call. Once the breakpoint 
>>>> it hit,
>>>> you can examine the call stack in your debugger.
>>>>
>>>> Use the JPDA socket JVM args when starting your Java process. Eg :  
>>>> -Xdebug
>>>> -Xnoagent 
>>>> -Djava.compiler=NONE-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005

>>>>
>>>>
>>>>
>>>> On 7/13/06, Adrian Tarau <ady@daxtechnologies.com> wrote:
>>>>>
>>>>> That's great but this usually happens on the remote machine, after 
>>>>> a few
>>>>> days :)
>>>>>
>>>>> James Strachan wrote:
>>>>> > Maybe try run your code in a debugger and set breakpoints in the
>>>>> > ActiveMQSession class to see where & why the session gets 
>>>>> closed? (The
>>>>> > code's pretty easy to follow, its a relatively simple class).
>>>>> >
>>>>> >
>>>>> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com> wrote:
>>>>> >> James, do you want the sources or can you recommend me some

>>>>> actions? To
>>>>> >> create the session all the time, not to cache it?
>>>>> >>
>>>>> >> Adrian Tarau wrote:
>>>>> >> > Ok, so who close the session? :)
>>>>> >> >
>>>>> >> > James Strachan wrote:
>>>>> >> >> Sorry I forgot about that :)
>>>>> >> >>
>>>>> >> >> The only time a VM broker is shut down is explicitly
via 
>>>>> application
>>>>> >> >> code or a shutdown handler.
>>>>> >> >>
>>>>> >> >> On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>
wrote:
>>>>> >> >>> I use vm connector so no connection problems should
be 
>>>>> involved,
>>>>> >> right?
>>>>> >> >>>
>>>>> >> >>> James Strachan wrote:
>>>>> >> >>> > BTW are you using auto-reconnect? It could
be that the 
>>>>> socket is
>>>>> >> >>> > terminated due to some network issue?
>>>>> >> >>> >
>>>>> >> >>> >
>>>>> >> >>>
>>>>> >>
>>>>> http://incubator.apache.org/activemq/how-can-i-support-auto-reconnection.html

>>>>>
>>>>> >>
>>>>> >> >>>
>>>>> >> >>> >
>>>>> >> >>> >
>>>>> >> >>> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>
wrote:
>>>>> >> >>> >> I made a search for "close" in the source
code and 
>>>>> except from
>>>>> >> >>> produces,
>>>>> >> >>> >> consumers and inputstream and outputstream
I don't close 
>>>>> the
>>>>> >> >>> connection
>>>>> >> >>> >> or session, except on the JVM shutdown(Thread
hook).
>>>>> >> >>> >>
>>>>> >> >>> >> I can provide you(private - not on the
mailing list) the
>>>>> >> source code
>>>>> >> >>> >> because this is very annoying.
>>>>> >> >>> >> Thanks.
>>>>> >> >>> >>
>>>>> >> >>> >> James Strachan wrote:
>>>>> >> >>> >> > Apart from inactivity timeouts on
the transport layer, we
>>>>> >> >>> generally
>>>>> >> >>> >> > don't close sessions. Are you sure
nothing in your 
>>>>> application
>>>>> >> >>> code is
>>>>> >> >>> >> > trying to close the session?
>>>>> >> >>> >> >
>>>>> >> >>> >> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>
wrote:
>>>>> >> >>> >> >> I have this issue for some time
with ActiveMQ 4.0 and 
>>>>> 4.0.1.
>>>>> >> >>> >> >>
>>>>> >> >>> >> >> I use vm transport and create
one session used to 
>>>>> produce and
>>>>> >> >>> consume
>>>>> >> >>> >> >> messages. Everything works fine,
days in a row, until it
>>>>> >> start to
>>>>> >> >>> >> throw
>>>>> >> >>> >> >> exception that "Session is closed".
There are any watch
>>>>> >> dogs that
>>>>> >> >>> >> close
>>>>> >> >>> >> >> sessions after a while, based
on some criteria?
>>>>> >> >>> >> >>
>>>>> >> >>> >> >> I couldn't find any rule, when
or why it happens.
>>>>> >> >>> >> >> Should I create the session all
the time - I 
>>>>> understood  is
>>>>> >> time
>>>>> >> >>> >> >> consuming and it should be safe
to cache it.
>>>>> >> >>> >> >>
>>>>> >> >>> >> >> Thanks.
>>>>> >> >>> >> >>
>>>>> >> >>> >> >> *javax.jms.IllegalStateException:
The Session is closed
>>>>> >> >>> >> >>     at
>>>>> >> >>> >> >>
>>>>> >> >>> >>
>>>>> >> >>>
>>>>> >> 
>>>>> org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java
>>>>> :577)
>>>>> >>
>>>>> >> >>>
>>>>> >> >>> >>
>>>>> >> >>> >> >>
>>>>> >> >>> >> >>     at
>>>>> >> >>> >> >>
>>>>> >> >>> >>
>>>>> >> >>>
>>>>> >> 
>>>>> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java

>>>>>
>>>>> :799)*
>>>>> >>
>>>>> >> >>>
>>>>> >> >>> >>
>>>>> >> >>> >> >>
>>>>> >> >>> >> >>
>>>>> >> >>> >> >>
>>>>> >> >>> >> >
>>>>> >> >>> >> >
>>>>> >> >>> >>
>>>>> >> >>> >>
>>>>> >> >>> >
>>>>> >> >>> >
>>>>> >> >>>
>>>>> >> >>>
>>>>> >> >>
>>>>> >> >>
>>>>> >> >
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>>
>>>>>
>>>>
>>>
>>
>
>



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message