activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Tarau <...@daxtechnologies.com>
Subject Re: Session closed after a while(random)
Date Tue, 18 Jul 2006 14:08:32 GMT
Any thoughts  ???

Adrian Tarau wrote:
> I think I found the cause???? :)
>
> Look at this stack trace(which seems to be the source of our problems, 
> after this exceptions we got "Session close"), which at some moment in 
> time I got it(see bellow -  I will mark with bold the what's 
> important). Look at the bold stack entry, in 
> ActiveMQConnection.onException.
> Un unrecoverable error in transport close the sessions, consumers, 
> listeners , etc but doesn't close the connection.
>
> My question is how can I avoid getting the "Session close" message? 
> How can I test if the session is closed? It is this the right way to 
> handle a transport error, cleaning up the connection?
>
> I cannot test if the session is closed(ActiveMQSession.closed field is 
> private), eventually I can try to do something with the session and in 
> case of failure, I will recreate the session. Is this the right way to 
> do it?
>
> Thanks.
>
> /**
>     * An unrecoverable exception has occured on the transport
>     * @param error
>     */
> public void onException(final IOException error) {
>        onAsyncException(error);
>        asyncConnectionThread.execute(new Runnable(){
>            public void run() {
>                *transportFailed(error);*
>                ServiceSupport.dispose(ActiveMQConnection.this.transport);
>                brokerInfoReceived.countDown();
>                      for (Iterator iter = 
> transportListeners.iterator(); iter.hasNext();) {
>                    TransportListener listener = (TransportListener) 
> iter.next();
>                    listener.onException(error);
>                }
>            }
>        });
>    }
>
> and then transportFailed
>
> protected void transportFailed(IOException error){
>        transportFailed.set(true);
>        if (firstFailureError == null) {
>            firstFailureError = error;
>        }
>        if (!closed.get() && !closing.get()) {
>            try{
>                *cleanup();*
>            }catch(JMSException e){
>               log.warn("Cleanup failed",e);
>            }
>        }
>    }
>
> and then cleanup
>
> public void cleanup() throws JMSException {
>              if( advisoryConsumer!=null ) {
>            advisoryConsumer.dispose();
>            advisoryConsumer=null;
>        }
>              for (Iterator i = this.sessions.iterator(); i.hasNext();) {
>            *ActiveMQSession s = (ActiveMQSession) i.next();
>            s.dispose();*
>        }
>        for (Iterator i = this.connectionConsumers.iterator(); 
> i.hasNext();) {
>            ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) 
> i.next();
>            c.dispose();
>        }
>        for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
>            ActiveMQInputStream c = (ActiveMQInputStream) i.next();
>            c.dispose();
>        }
>        for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
>            ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
>            c.dispose();
>        }
>
>        if(isConnectionInfoSentToBroker){
>            if(!transportFailed.get() && !closing.get()){
>                asyncSendPacket(info.createRemoveCommand());
>            }
>            isConnectionInfoSentToBroker=false;
>        }              if( userSpecifiedClientID ) {
>            info.setClientId(null);
>            userSpecifiedClientID=false;
>        }
>        clientIDSet = false;
>
>        started.set(false);
>    }
>
> *Stack trace*
>
>    at 
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45) 
>
>    at 
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)

>
>    *at 
> org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478)* 
>
>    at 
> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)

>
>    at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63) 
>
>    at 
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>    at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
>
>    at 
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)

>
>    at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)

>
>    at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63) 
>
>    at 
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>    at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)

>
>    at 
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)

>
>    at 
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131) 
>
>    at 
> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667) 
>
>    at 
> org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)

>
>    at 
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840) 
>
>    at 
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800) 
>
>    at 
> com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)

>
>    at 
> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)

>
>    at 
> com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)

>
> Caused by: java.io.InterruptedIOException: Interrupted.
>    at 
> org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
>    at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)

>
>
> Adrian Tarau wrote:
>> Even more :)
>>
>> As I told you I created a a Session wrapper over the ActiveMQ session 
>> in order to catch the close call.
>>
>> you can see down bellow the close method :
>>
>> public void close() throws JMSException {
>>        try {
>>            IllegalAccessError illegalAccessError = new 
>> IllegalAccessError("I don't want to close");
>>            StringWriter sw = new StringWriter();
>>            illegalAccessError.printStackTrace(new PrintWriter(sw));
>>            LOG.error("Somebody closed the session, let's see who\n\n" 
>> + sw.toString());
>>            System.out.println("Somebody closed the session, let's see 
>> who\n\n" + sw.toString());
>>        } finally {
>>            session.close();
>>        }
>>    }
>>
>> Well, I looked in the error log, also looked in the container 
>> log(JBoss) and didn't find any of theses  strings "Somebody ...". The 
>> conclusion is that nobody called close explicit on the session.
>> I placed that on 3 systems and over the weekend I got on all this 
>> problem. One common thing between the systems is that are all 
>> loaded(are test machines) and on all everything start to get "Session 
>> close" after I got this exception(see bellow).
>>
>> I don't know why we got java.io.InterruptedIOException:, it could be 
>> possible because of me :). We interrupt the current thread(which 
>> consume the message) if didn't respond after 60 secs(some kind of 
>> watchdog) so it is possible to be because I call explicit 
>> Thread.interrupt().
>>
>> Anyway, I thinkg, this should affect the sessions state(the close 
>> field should not be turned false by any other piece of code, except 
>> close() function, right?) should not be affected by am interrupt 
>> exception in a consumer.
>>
>> Thats all I could get about this problem with session close, anyway I 
>> think I will create all the time the session, to avoid this problem.
>>
>> Thanks.
>>
>>
>> 2006-07-15 03:24:54,628 [AcitveMQ Connection Worker: 
>> vm://localhost#0] ERROR 
>> com.daxtechnologies.sams.manager.SamsManagerJMS - JMS Error
>> javax.jms.JMSException: Interrupted.
>>    at 
>> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45)

>>
>>    at 
>> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)

>>
>>    at 
>> org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478)

>>
>>    at 
>> org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94)

>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)

>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)

>>
>>    at 
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)

>>
>>    at 
>> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>>    at 
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)

>>
>>    at 
>> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)

>>
>>    at 
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)

>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)

>>
>>    at 
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)

>>
>>    at 
>> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
>>    at 
>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44) 
>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)

>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)

>>
>>    at 
>> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)

>>
>>    at 
>> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667) 
>>
>>    at 
>> org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)

>>
>>    at 
>> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840) 
>>
>>    at 
>> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800) 
>>
>>    at 
>> com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)

>>
>>    at 
>> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)

>>
>> Caused by: java.io.InterruptedIOException: Interrupted.
>>    at 
>> org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)

>>
>>    ... 23 more
>> 2006-07-15 03:24:54,805 [SAMS-Worker-queue.contact_type-3] ERROR 
>> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue - Cannot 
>> consume message from queue queue 'Contact Queue 
>> <system/ady@daxtechnologies.com>' javax.jms.JMSException: Interrupted.
>>    at 
>> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)

>>
>>    at 
>> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1141)

>>
>>    at 
>> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667) 
>>
>>    at 
>> org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:516)

>>
>>    at 
>> com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:145)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)

>>
>>    at 
>> com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)

>>
>> Caused by: java.io.InterruptedIOException: Interrupted.
>>    at 
>> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) 
>>
>>    at 
>> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:74)

>>
>>    at 
>> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)

>>
>>    ... 7 more
>>
>> Adrian Tarau wrote:
>>> More about closed session :)
>>>
>>> Could be possible that the closing problem could occur because the 
>>> Session is not per Thread, even if it is accessed by only on thread 
>>> at the time(or it should be accessed, but sometime is not - 
>>> backport-concurent problems)?
>>>
>>> The code look like that :
>>> class MyStorageQueue {
>>>   private  Session session;
>>>   private Queue queue;
>>>   protected final Semaphore lock = new Semaphore(1, true);
>>>     ....
>>>  public Object consume() {
>>>     ....
>>>  }
>>>
>>>  public void publish(Object object) {
>>>     ....
>>>  }
>>> }
>>>
>>> and usage in different threads
>>> MyStorageQueue queue = getNextQueue(...);
>>> if (queue.lock.tryAcquire()) {
>>>                try {
>>>                       Object value = consume(queue);
>>>                        .....
>>>                } finally {
>>>                    queue.lock.release();
>>>                }
>>> }
>>>
>>> Anyway I created a wrapper over an ActiveMQ session, and dump the 
>>> stack trace to see who calls the close method.
>>>
>>>
>>>
>>> Adrian Tarau wrote:
>>>> Debugging remove is not a problem, the problem is that I need to 
>>>> start the application in debug mode and keep the IDE with a 
>>>> breakpoint for days until it happens.
>>>>
>>>> I think I will recompile the ActiveMQ and dump the thread stack 
>>>> trace :)
>>>>
>>>> Sanjiv Jivan wrote:
>>>>> You can attach your debugger to the Java process on the remote 
>>>>> machine and
>>>>> set the breakpoint on the session.close() call. Once the 
>>>>> breakpoint it hit,
>>>>> you can examine the call stack in your debugger.
>>>>>
>>>>> Use the JPDA socket JVM args when starting your Java process. Eg 
>>>>> :  -Xdebug
>>>>> -Xnoagent 
>>>>> -Djava.compiler=NONE-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005

>>>>>
>>>>>
>>>>>
>>>>> On 7/13/06, Adrian Tarau <ady@daxtechnologies.com> wrote:
>>>>>>
>>>>>> That's great but this usually happens on the remote machine, 
>>>>>> after a few
>>>>>> days :)
>>>>>>
>>>>>> James Strachan wrote:
>>>>>> > Maybe try run your code in a debugger and set breakpoints in
the
>>>>>> > ActiveMQSession class to see where & why the session gets

>>>>>> closed? (The
>>>>>> > code's pretty easy to follow, its a relatively simple class).
>>>>>> >
>>>>>> >
>>>>>> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com> wrote:
>>>>>> >> James, do you want the sources or can you recommend me some

>>>>>> actions? To
>>>>>> >> create the session all the time, not to cache it?
>>>>>> >>
>>>>>> >> Adrian Tarau wrote:
>>>>>> >> > Ok, so who close the session? :)
>>>>>> >> >
>>>>>> >> > James Strachan wrote:
>>>>>> >> >> Sorry I forgot about that :)
>>>>>> >> >>
>>>>>> >> >> The only time a VM broker is shut down is explicitly
via 
>>>>>> application
>>>>>> >> >> code or a shutdown handler.
>>>>>> >> >>
>>>>>> >> >> On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>
wrote:
>>>>>> >> >>> I use vm connector so no connection problems
should be 
>>>>>> involved,
>>>>>> >> right?
>>>>>> >> >>>
>>>>>> >> >>> James Strachan wrote:
>>>>>> >> >>> > BTW are you using auto-reconnect? It could
be that the 
>>>>>> socket is
>>>>>> >> >>> > terminated due to some network issue?
>>>>>> >> >>> >
>>>>>> >> >>> >
>>>>>> >> >>>
>>>>>> >>
>>>>>> http://incubator.apache.org/activemq/how-can-i-support-auto-reconnection.html

>>>>>>
>>>>>> >>
>>>>>> >> >>>
>>>>>> >> >>> >
>>>>>> >> >>> >
>>>>>> >> >>> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>
wrote:
>>>>>> >> >>> >> I made a search for "close" in the
source code and 
>>>>>> except from
>>>>>> >> >>> produces,
>>>>>> >> >>> >> consumers and inputstream and outputstream
I don't 
>>>>>> close the
>>>>>> >> >>> connection
>>>>>> >> >>> >> or session, except on the JVM shutdown(Thread
hook).
>>>>>> >> >>> >>
>>>>>> >> >>> >> I can provide you(private - not on
the mailing list) the
>>>>>> >> source code
>>>>>> >> >>> >> because this is very annoying.
>>>>>> >> >>> >> Thanks.
>>>>>> >> >>> >>
>>>>>> >> >>> >> James Strachan wrote:
>>>>>> >> >>> >> > Apart from inactivity timeouts
on the transport 
>>>>>> layer, we
>>>>>> >> >>> generally
>>>>>> >> >>> >> > don't close sessions. Are you
sure nothing in your 
>>>>>> application
>>>>>> >> >>> code is
>>>>>> >> >>> >> > trying to close the session?
>>>>>> >> >>> >> >
>>>>>> >> >>> >> > On 7/13/06, Adrian Tarau <ady@daxtechnologies.com>

>>>>>> wrote:
>>>>>> >> >>> >> >> I have this issue for some
time with ActiveMQ 4.0 
>>>>>> and 4.0.1.
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >> I use vm transport and create
one session used to 
>>>>>> produce and
>>>>>> >> >>> consume
>>>>>> >> >>> >> >> messages. Everything works
fine, days in a row, 
>>>>>> until it
>>>>>> >> start to
>>>>>> >> >>> >> throw
>>>>>> >> >>> >> >> exception that "Session is
closed". There are any watch
>>>>>> >> dogs that
>>>>>> >> >>> >> close
>>>>>> >> >>> >> >> sessions after a while, based
on some criteria?
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >> I couldn't find any rule,
when or why it happens.
>>>>>> >> >>> >> >> Should I create the session
all the time - I 
>>>>>> understood  is
>>>>>> >> time
>>>>>> >> >>> >> >> consuming and it should be
safe to cache it.
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >> Thanks.
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >> *javax.jms.IllegalStateException:
The Session is closed
>>>>>> >> >>> >> >>     at
>>>>>> >> >>> >> >>
>>>>>> >> >>> >>
>>>>>> >> >>>
>>>>>> >> 
>>>>>> org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java
>>>>>> :577)
>>>>>> >>
>>>>>> >> >>>
>>>>>> >> >>> >>
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >>     at
>>>>>> >> >>> >> >>
>>>>>> >> >>> >>
>>>>>> >> >>>
>>>>>> >> 
>>>>>> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java

>>>>>>
>>>>>> :799)*
>>>>>> >>
>>>>>> >> >>>
>>>>>> >> >>> >>
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >>
>>>>>> >> >>> >> >
>>>>>> >> >>> >> >
>>>>>> >> >>> >>
>>>>>> >> >>> >>
>>>>>> >> >>> >
>>>>>> >> >>> >
>>>>>> >> >>>
>>>>>> >> >>>
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>
>


Mime
View raw message