camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Tombs <cyan.s...@gmail.com>
Subject Re: Freeing objects returned by type converter
Date Mon, 10 Jan 2011 16:31:40 GMT
On Mon, Jan 10, 2011 at 5:03 AM, Willem Jiang <willem.jiang@gmail.com>wrote:


>  On 1/8/11 12:45 AM, David Tombs wrote:
>
>
>> On Fri, Jan 7, 2011 at 10:39 AM, Claus Ibsen<claus.ibsen@gmail.com>
>>  wrote:
>>
>>
>>
>>>  On Fri, Jan 7, 2011 at 4:14 PM, David Tombs<cyan.spam@gmail.com>
>>>  wrote:
>>>
>>>
>>>> On 1/7/11, James Strachan<james@fusesource.com>  wrote:
>>>>
>>>>
>>>>> On 7 January 2011 14:51, David Tombs<cyan.spam@gmail.com>  wrote:
>>>>>
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I have recently started using camel and it's fantastic. Thanks for
all
>>>>>> your hard work, committers.
>>>>>>
>>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>> One issue I came across is whether I should free or close objects
>>>>>> returned by a type converter. For example, I have a class that reads
>>>>>> some binary data from an InputStream so I wrote a custom Processor
>>>>>> that does this:
>>>>>>
>>>>>> InputStream productStream =
>>>>>>
>>>>>>
>>>>> exchange.getIn().getBody(InputStream.class);
>>>
>>>
>>>>  It appears that I need to call close() on productStream or else my
>>>>>> file consumer was leaving open handles to the source files. Is this
>>>>>> always the case? If the body was already an InputStream, would calling
>>>>>> close() be harmful? Am I doing something wrong?
>>>>>>
>>>>>> I couldn't find anything in the documentation regarding this--type
>>>>>> converters were always used to convert to Strings in examples.
>>>>>>
>>>>>>
>>>>> Once you've created the stream, its up to you to close it. So you
>>>>> might want to create a little helper class to make sure you always
>>>>> call close on it. I guess we could add a little helper class to Camel
>>>>> to make it a bit easier to use streams and ensure they get closed.
>>>>>
>>>>> --
>>>>>
>>>>> --
>>>>> James
>>>>>
>>>>>
>>>> Thanks for the quick reply, James.
>>>>
>>>> One remaining question: how does this work if the body was already an
>>>> InputStream? Would a type converter even be used in that case? Would
>>>> the IS I get back be a clone of the "real" IS or something like that?
>>>>
>>>>
>>>>
>>> If the body is already the type you want to convert to, then its returned
>>> as is.
>>>
>>>
>>
>> Then is closing it is harmful.
>>
>> I just tested converting the body to an InputStream before sending it to
>> my
>> Processor as below:
>>
>>          <from
>>
>> uri="file:messages/radarIn?delete=true&amp;exclude=.*.tmp&amp;readLock=none"
>> />
>>          <convertBodyTo type="java.io.InputStream" />
>>          <!-- Add dynamic routing properties -->
>>          <process ref="radarPropertyAdder" />
>>          <to uri="jms:topic:HPWC.IN?testConnectionOnStartup=true<http://hpwc.in/?testConnectionOnStartup=true>"
>> />
>>
>> The producer endpoint then kept throwing NullPointerExceptions.
>>
>> So what's the general solution here? Does one have to figure out what type
>> the body will be then bake that assumption into the code? A more
>> reasonable
>> alternative would be to have getBody() always return an object that the
>> user
>> owns, but that would require cloning objects like InputStream that don't
>> support clone().
>>
>> I suppose a workaround for now could be:
>>
>>       if (productStream != exchange.getIn().getBody())
>>       {
>>          productStream.close();
>>       }
>>
>> That should avoid the harmful close, right?
>>
>> Thanks again,
>> David
>>
>>
>>
>
> Can I have a look at the stack trace of the NPE?
> Maybe you just need to check if the message body is null.
>
>

Sure, see below. It seems that after I close() the IS, it (naturally) fails
to type convert to byte[] which is needed to send a BytesMessage. My
above-posted workaround seems to work fine.

2011-01-10 11:26:02,414 DEBUG: About to process file:
GenericFile[foo.product] using exchange: Exchange[GenericFileMessage with
file: GenericFile[foo.product]]
2011-01-10 11:26:02,419 DEBUG: Executing callback on JMS Session:
weblogic.jms.client.WLSessionImpl@76b19bef
2011-01-10 11:26:02,421 DEBUG: No type converter available to convert from
type: java.io.BufferedInputStream to the required type: byte[] with value
java.io.BufferedInputStream@674a4ca Caused by: java.io.IOException: Stream
closed. Will ignore this and continue.
2011-01-10 11:26:02,424 DEBUG: Failed delivery for exchangeId:
c160e024-aa5c-4e61-9baf-7c947da638c2. On delivery attempt: 0 caught:
java.lang.NullPointerException
2011-01-10 11:26:02,424 ERROR: Failed delivery for exchangeId:
c160e024-aa5c-4e61-9baf-7c947da638c2. Exhausted after delivery attempt: 1
caught: java.lang.NullPointerException
java.lang.NullPointerException
 at java.io.OutputStream.write(OutputStream.java:58)[:1.6.0_20]
 at
weblogic.jms.common.BytesMessageImpl.writeBytes(BytesMessageImpl.java:490)[wlfullclient-10.3.0.0.jar:10.3.0.0]
 at
org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:471)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:277)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:316)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:221)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:164)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$3.doInJms(JmsConfiguration.java:198)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)[spring-jms-2.5.6.jar:2.5.6]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:195)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:375)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:320)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:106)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:189)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:87)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:97)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:185)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:151)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:89)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:228)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.Pipeline.process(Pipeline.java:75)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:70)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:269)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:137)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:98)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_20]
 at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_20]
 at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)[:1.6.0_20]
 at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
 at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
2011-01-10 11:26:02,425 DEBUG: This exchange is not handled so its marked as
failed: Exchange[GenericFileMessage with body:
java.io.BufferedInputStream@674a4ca]
2011-01-10 11:26:02,425 DEBUG: Message exchange has failed so breaking out
of pipeline: Exchange[GenericFileMessage with body:
java.io.BufferedInputStream@674a4ca] Exception:
java.lang.NullPointerException
2011-01-10 11:26:02,426 DEBUG: Done processing file:
GenericFile[foo.product] using exchange: Exchange[GenericFileMessage with
body: java.io.BufferedInputStream@674a4ca]
2011-01-10 11:26:02,426 ERROR: Caused by: [java.lang.NullPointerException -
null]
java.lang.NullPointerException
 at java.io.OutputStream.write(OutputStream.java:58)[:1.6.0_20]
 at
weblogic.jms.common.BytesMessageImpl.writeBytes(BytesMessageImpl.java:490)[wlfullclient-10.3.0.0.jar:10.3.0.0]
 at
org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:471)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:277)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:316)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:221)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:164)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$3.doInJms(JmsConfiguration.java:198)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)[spring-jms-2.5.6.jar:2.5.6]
 at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:195)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:375)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:320)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:150)[camel-jms-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:106)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor$1.doInProducer(SendProcessor.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:189)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor.doProcess(SendProcessor.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:87)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:53)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:82)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:97)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:185)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:151)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:89)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:49)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:228)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.Pipeline.process(Pipeline.java:75)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:70)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:67)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:269)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:137)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:103)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:98)[camel-core-2.3.0-fuse-01-00.jar:2.3.0-fuse-01-00]
 at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_20]
 at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_20]
 at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)[:1.6.0_20]
 at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)[:1.6.0_20]
 at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20]
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20]
 at java.lang.Thread.run(Thread.java:619)[:1.6.0_20]
2011-01-10 11:26:02,426  WARN: Rollback file strategy:
org.apache.camel.component.file.strategy.GenericFileDeleteProcessStrategy@64a06824for
file: GenericFile[foo.product]

Thanks,
David

 --
Wise men _still_ seek Him.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message