camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leandro Franchi <leandro.fran...@gmail.com>
Subject Re: File do ActiveMQ
Date Wed, 07 Nov 2012 10:15:37 GMT
Hi Christian, first of all, thanks for reply.

I've 160 routes like rt-* (see below). These routes, polling files in 
directories and send each file to seda endpoint. The seda endpoint 
process messages and send a processed message to ActiveMQ. I'm using 
ActiveMQ because these process are running at another computer.

I can't use recursive property for file endpoint because I've a lot of 
business rules to verify depends on complete file path, etc.

I'm using Apache Camel 2.10.2 ans ActiveMQ 5.7.

Following the routes:


<!-- Retrive files from directory 1, server A -->
<route id="rt-147" errorHandlerRef="myErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from 
uri="file:/mnt/serverA/1?charset=iso-8859-1&amp;readLock=changed&amp;readLockCheckInterval=200&amp;readLockMinLength=0&amp;delay=500&amp;maxMessagesPerPoll=100&amp;sortBy=${file:modified}"

/>
     <onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
         <redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
         <to uri="activemq:queue:alarm" />
     </onException>
     <choice>
         <when>
             <simple>${file:name} contains '_FILE1.txt'</simple>
             <to uri="seda:fe_FILE1" />
         </when>
         <when>
             <simple>${file:name} contains '_FILE2.txt'</simple>
             <to uri="seda:fe_FILE2" />
         </when>
     </choice>
</route>

<!-- Retrive files from directory 2, server A -->
<route id="rt-148" errorHandlerRef="myErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from 
uri="file:/serverA/2?charset=iso-8859-1&amp;readLock=changed&amp;readLockCheckInterval=200&amp;readLockMinLength=0&amp;delay=500&amp;maxMessagesPerPoll=100&amp;sortBy=${file:modified}"

/>
     <onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
         <redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
         <to uri="activemq:queue:alarm" />
     </onException>
     <choice>
         <when>
             <simple>${file:name} contains '_FILE1.txt'</simple>
             <to uri="seda:fe_FILE1" />
         </when>
         <when>
             <simple>${file:name} contains '_FILE2.txt'</simple>
             <to uri="seda:fe_FILE2" />
         </when>
     </choice>
</route>

<!-- Retrive files from directory 1, server B -->
<route id="rt-149" errorHandlerRef="myErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from 
uri="file:/serverB/1?charset=iso-8859-1&amp;readLock=changed&amp;readLockCheckInterval=200&amp;readLockMinLength=0&amp;delay=500&amp;maxMessagesPerPoll=100&amp;sortBy=${file:modified}"

/>
     <onException useOriginalMessage="true">
<exception>java.text.ParseException</exception>
<exception>java.lang.ArrayIndexOutOfBoundsException</exception>
         <redeliveryPolicy maximumRedeliveries="0"/>
<handled><constant>true</constant></handled>
         <to uri="activemq:queue:alarm" />
     </onException>
     <choice>
         <when>
             <simple>${file:name} contains '_FILE1.txt'</simple>
             <to uri="seda:fe_FILE1" />
         </when>
         <when>
             <simple>${file:name} contains '_FILE2.txt'</simple>
             <to uri="seda:fe_FILE2" />
         </when>
     </choice>
</route>

<!-- Processor for files from type 1 -->
<route id="sedaFile1" errorHandlerRef="mirErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from 
uri="seda:fe_FILE1?concurrentConsumers=10&amp;waitForTaskToComplete=Never" 
/>
     <process ref="switchFileFrontendCounterProcessor" />
     <setHeader 
headerName="ALARM_TARGET"><constant>F1</constant></setHeader>
     <to uri="activemq:queue:alarm" />
</route>

<!-- Processor for files from type 2 -->
<route id="sedaFile2" errorHandlerRef="mirErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from 
uri="seda:fe_FILE2?concurrentConsumers=10&amp;waitForTaskToComplete=Never" 
/>
     <process ref="switchFileFrontendCounterProcessor" />
     <setHeader 
headerName="ALARM_TARGET"><constant>F2</constant></setHeader>
     <to uri="activemq:queue:alarm" />
</route>

<!-- Queue for process Alarms -->
<route id="file-alarm" errorHandlerRef="mirErrorHandler" 
shutdownRoute="Defer" autoStartup="true">
     <from uri="activemq:alarm" />
     <choice>
         <when>
             <simple>${headers[ALARM_TARGET]} == "F1"</simple>
             <to uri="sql:exec Prc_File1 #, #, # 
?dataSourceRef=myDatasource" />
         </when>
     </choice>
     <to uri="bean://alarmEngineBean"/>
</route>

The ActiveMQ configuration is:


<bean id="jmsConnectionFactory" 
class="org.apache.activemq.ActiveMQConnectionFactory">
     <property name="brokerURL" value="tcp://10.59.133.123:4848" />
</bean>

<bean id="pooledConnectionFactory" 
class="org.apache.activemq.pool.PooledConnectionFactory">
     <property name="maxConnections" value="10" />
     <property name="maximumActive" value="10" />
     <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

Thanks for all
Leandro Franchi


On 06-11-2012 21:45, Christian Müller wrote:
> Which version do you use?
> Hoe does your route looks like?
>
> Sent from a mobile device
> Am 06.11.2012 19:29 schrieb "Leandro Franchi" <leandro.franchi@gmail.com>:
>
>> Hi,
>>
>> I've a route to read files, convert the body to string and send to
>> ActiveMQ. It's ok, but somentimes, I've got these error:
>>
>>
>> 06/11/2012 16:19:19.174 [Camel (mirCamel) thread #21 - file:/tmp] ERROR
>> org.apache.camel.processor.**FatalFallbackErrorHandler:55 - \--> New
>> exception on exchangeId: ID-alxredmircol01-51775-**1352225432241-0-14234
>>   org.apache.camel.**TypeConversionException: Error during type conversion
>> from type: org.apache.camel.component.**file.GenericFile to the required
>> type: byte[] with value GenericFile[/mnt/sox/HST/**20121030_174713_N.txt]
>> due java.lang.**IllegalArgumentException: reader must be specified
>>      at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
>> convertTo(**BaseTypeConverterRegistry.**java:126)
>>      at org.apache.camel.component.**jms.JmsBinding.**
>> createJmsMessageForType(**JmsBinding.java:534)
>>      at org.apache.camel.component.**jms.JmsBinding.**
>> createJmsMessage(JmsBinding.**java:464)
>>      at org.apache.camel.component.**jms.JmsBinding.makeJmsMessage(**
>> JmsBinding.java:285)
>>      at org.apache.camel.component.**jms.JmsProducer$2.**
>> createMessage(JmsProducer.**java:266)
>>      at org.apache.camel.component.**jms.JmsConfiguration$**
>> CamelJmsTemplate.**doSendToDestination(**JmsConfiguration.java:214)
>>      at org.apache.camel.component.**jms.JmsConfiguration$**
>> CamelJmsTemplate.access$100(**JmsConfiguration.java:157)
>>      at org.apache.camel.component.**jms.JmsConfiguration$**
>> CamelJmsTemplate$3.doInJms(**JmsConfiguration.java:191)
>>      at org.springframework.jms.core.**JmsTemplate.execute(**
>> JmsTemplate.java:466)
>>      at org.apache.camel.component.**jms.JmsConfiguration$**
>> CamelJmsTemplate.send(**JmsConfiguration.java:188)
>>      at org.apache.camel.component.**jms.JmsProducer.doSend(**
>> JmsProducer.java:398)
>>      at org.apache.camel.component.**jms.JmsProducer.processInOnly(**
>> JmsProducer.java:352)
>>      at org.apache.camel.component.**jms.JmsProducer.process(**
>> JmsProducer.java:132)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**SendProcessor$2.**doInAsyncProducer(**
>> SendProcessor.java:120)
>>      at org.apache.camel.impl.**ProducerCache.**doInAsyncProducer(**
>> ProducerCache.java:292)
>>      at org.apache.camel.processor.**SendProcessor.process(**
>> SendProcessor.java:115)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
>> *DelegateAsyncProcessor.java:**99)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.management.**InstrumentationProcessor.**process(**
>> InstrumentationProcessor.java:**73)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
>> *DelegateAsyncProcessor.java:**99)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.processor.**interceptor.TraceInterceptor.**
>> process(TraceInterceptor.java:**91)
>>      at org.apache.camel.processor.**interceptor.**
>> StreamCachingInterceptor.**process(**StreamCachingInterceptor.java:**52)
>>      at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
>> RouteContextProcessor.java:45)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.processor.**interceptor.DefaultChannel.**
>> process(DefaultChannel.java:**303)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:117)
>>      at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:80)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
>> *DelegateAsyncProcessor.java:**99)
>>      at org.apache.camel.processor.**FatalFallbackErrorHandler.**
>> processNext(**FatalFallbackErrorHandler.**java:42)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**RedeliveryErrorHandler.**
>> deliverToFailureProcessor(**RedeliveryErrorHandler.java:**759)
>>      at org.apache.camel.processor.**RedeliveryErrorHandler.**
>> processErrorHandler(**RedeliveryErrorHandler.java:**273)
>>      at org.apache.camel.processor.**RedeliveryErrorHandler.**process(**
>> RedeliveryErrorHandler.java:**220)
>>      at org.apache.camel.processor.**interceptor.**
>> StreamCachingInterceptor.**process(**StreamCachingInterceptor.java:**52)
>>      at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
>> RouteContextProcessor.java:45)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.processor.**interceptor.DefaultChannel.**
>> process(DefaultChannel.java:**303)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:117)
>>      at org.apache.camel.processor.**Pipeline.process(Pipeline.**java:80)
>>      at org.apache.camel.processor.**RouteContextProcessor.**processNext(**
>> RouteContextProcessor.java:45)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.processor.**UnitOfWorkProcessor.**processAsync(**
>> UnitOfWorkProcessor.java:150)
>>      at org.apache.camel.processor.**UnitOfWorkProcessor.process(**
>> UnitOfWorkProcessor.java:117)
>>      at org.apache.camel.processor.**RouteInflightRepositoryProcess**
>> or.processNext(**RouteInflightRepositoryProcess**or.java:48)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.util.**AsyncProcessorHelper.process(**
>> AsyncProcessorHelper.java:73)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**processNext(*
>> *DelegateAsyncProcessor.java:**99)
>>      at org.apache.camel.processor.**DelegateAsyncProcessor.**process(**
>> DelegateAsyncProcessor.java:**90)
>>      at org.apache.camel.management.**InstrumentationProcessor.**process(**
>> InstrumentationProcessor.java:**73)
>>      at org.apache.camel.component.**file.GenericFileConsumer.**
>> processExchange(**GenericFileConsumer.java:336)
>>      at br.com.flexvision.mir.**collector.component.sw.file.**
>> FileSwitchConsumer.**processExchange(**FileSwitchConsumer.java:26)
>>      at org.apache.camel.component.**file.GenericFileConsumer.**
>> processBatch(**GenericFileConsumer.java:189)
>>      at org.apache.camel.component.**file.GenericFileConsumer.poll(**
>> GenericFileConsumer.java:155)
>>      at org.apache.camel.impl.**ScheduledPollConsumer.doRun(**
>> ScheduledPollConsumer.java:**139)
>>      at org.apache.camel.impl.**ScheduledPollConsumer.run(**
>> ScheduledPollConsumer.java:91)
>>      at java.util.concurrent.**Executors$RunnableAdapter.**
>> call(Executors.java:471)
>>      at java.util.concurrent.**FutureTask$Sync.**
>> innerRunAndReset(FutureTask.**java:351)
>>      at java.util.concurrent.**FutureTask.runAndReset(**
>> FutureTask.java:178)
>>      at java.util.concurrent.**ScheduledThreadPoolExecutor$**
>> ScheduledFutureTask.access$**301(**ScheduledThreadPoolExecutor.**java:178)
>>      at java.util.concurrent.**ScheduledThreadPoolExecutor$**
>> ScheduledFutureTask.run(**ScheduledThreadPoolExecutor.**java:293)
>>      at java.util.concurrent.**ThreadPoolExecutor.runWorker(**
>> ThreadPoolExecutor.java:1110)
>>      at java.util.concurrent.**ThreadPoolExecutor$Worker.run(**
>> ThreadPoolExecutor.java:603)
>>      at java.lang.Thread.run(Thread.**java:722)
>> Caused by: org.apache.camel.**RuntimeCamelException: java.lang.**IllegalArgumentException:
>> reader must be specified
>>      at org.apache.camel.util.**ObjectHelper.**wrapRuntimeCamelException(**
>> ObjectHelper.java:1280)
>>      at org.apache.camel.util.**ObjectHelper.invokeMethod(**
>> ObjectHelper.java:936)
>>      at org.apache.camel.impl.**converter.**StaticMethodTypeConverter.**
>> convertTo(**StaticMethodTypeConverter.**java:47)
>>      at org.apache.camel.component.**file.GenericFileConverter.**convertTo(
>> **GenericFileConverter.java:93)
>>      at sun.reflect.**GeneratedMethodAccessor102.**invoke(Unknown Source)
>>      at sun.reflect.**DelegatingMethodAccessorImpl.**invoke(**
>> DelegatingMethodAccessorImpl.**java:43)
>>      at java.lang.reflect.Method.**invoke(Method.java:601)
>>      at org.apache.camel.util.**ObjectHelper.invokeMethod(**
>> ObjectHelper.java:932)
>>      at org.apache.camel.impl.**converter.**StaticMethodFallbackTypeConver*
>> *ter.convertTo(**StaticMethodFallbackTypeConver**ter.java:50)
>>      at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
>> doConvertTo(**BaseTypeConverterRegistry.**java:289)
>>      at org.apache.camel.impl.**converter.**BaseTypeConverterRegistry.**
>> convertTo(**BaseTypeConverterRegistry.**java:111)
>>      ... 70 more
>> Caused by: java.lang.**IllegalArgumentException: reader must be specified
>>      at org.apache.camel.util.**ObjectHelper.notNull(**
>> ObjectHelper.java:290)
>>      at org.apache.camel.util.**IOHelper.buffered(IOHelper.**java:117)
>>      at org.apache.camel.converter.**IOConverter.toByteArray(**
>> IOConverter.java:261)
>>      at sun.reflect.**GeneratedMethodAccessor230.**invoke(Unknown Source)
>>      at sun.reflect.**DelegatingMethodAccessorImpl.**invoke(**
>> DelegatingMethodAccessorImpl.**java:43)
>>      at java.lang.reflect.Method.**invoke(Method.java:601)
>>      at org.apache.camel.util.**ObjectHelper.invokeMethod(**
>> ObjectHelper.java:932)
>>      ... 79 more
>>
>> Anyone have idea?
>>
>> Thanks
>> Leandro Franchi
>>


Mime
View raw message