camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Stream Cache / spool file deletion before aggregation in Multicast, involving huge data
Date Sun, 07 Sep 2014 06:13:15 GMT
Hi

Yeah I suspect we need to defer to UoW to after the aggregate method.
I have logged a ticket.
https://issues.apache.org/jira/browse/CAMEL-7787


On Fri, Sep 5, 2014 at 1:23 PM, lakshmi.prashant
<lakshmi.prashant@gmail.com> wrote:
> Hi,
>
> Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5756092/Mybeans.xml>
>
> *Issue:    *
>
>   Whenever data is spooled in file via CachedOutputStream in any camel
> component in a multicast branch, that data becomes unreadable in
>
> a) Aggregation Strategy of Multicast
> b) After multicast, in case there is no aggregation strategy
>
> We are getting:
>
> a) FileNotFound issues as the file is deleted on completion of the cloned
> branch exchange.
> b) Premature end of file, when we read data from InputStream and use
> XMLReader / STAX to read the data.
>
> If we use the Constructor: new CachedOutputStream(exchange, false), the
> streamcache file will not be deleted.
> But the file may never be cleaned up & we do not want to do that.
>
>
> *Details*
>
>  We are using camel 2.13.2  - I have a multicast route with an
> AggregationStrategy.
>  And in each multicast branch, we have a custom camel component that returns
> huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and
> we need to aggregate the data in the multicast (AggregationStrrategy).
>
>
>   In the Aggregation strategy, I need to do XPath evaluation using camel
> XPathBuilder.
>   Hence, I try to read the body and convert from StreamCache to byte[] to
> avoid 'Error during type conversion from type:
> org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder.
>
> When I try to read the body in the beginning of the AggregationStrategy, I
> get the following error.
> *
> /tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
> (No such file or directory), cause:
> FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
> (No such file or directory).
>         at java.io.FileInputStream.open(Native Method)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at
> org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123)
> at
> org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117)
>         at
> org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93)
>         at
> org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)
>         at
> com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169)
>         at
> com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161)
>
> Following is the line of code where it is throwing an error:
>
>                             Object body = exchange.getIn().getBody();
>                 if( body instanceof StreamCache){
>                         StreamCache cache = (StreamCache)body;
>                         xml = new
> String(convertToByteArray(cache,exchange));
> exchange.getIn().setBody(xml);
>                 }
> *
>
>
>
> By disabling stream cache to write to file by setting a threshold of 10MB in
> multicast related routes,  we were able to work with the aggregation
> strategy. But we do not want to do that, as we may have incoming data that
> maybe bigger.
>
> <camel:camelContext id="multicast_xml_1" streamCache="true">
> <camel:properties>
> <camel:property key="CamelCachedOutputStreamCipherTransformation"
> value="RC4"/>
> <camel:property key=&quot;CamelCachedOutputStreamThreshold&quot;
> value=&quot;&lt;b>100000000*"/>
> </camel:properties>
>
> Note: The FileNotFound issue does not appear if we have the StreamCache
> based camel component in the route with other processors, but without
> Multicast + Aggregation .
>
> After debugging, I could understand the issue with aggregating huge data
> from StreamCache with MulticastProcessor.
>
> In MulticastProcessor.java: doProcessParallel() is called and on completion
> of the branch exchange of multicast, the CachedOutputStream deletes / cleans
> up the temporary file.
>
>  This happens even before the multicast branch exchange reaches the
> aggregation Strategy, which tries to read the data from the branch exchange.
> In case of huge data in StreamCache, the temporary file is already deleted,
> leading to FileNotFound issues.
>
>     *
>     public CachedOutputStream(Exchange exchange, boolean closedOnCompletion)
> {
>         this.strategy = exchange.getContext().getStreamCachingStrategy();
>         currentStream = new
> CachedByteArrayOutputStream(strategy.getBufferSize());
>
>         if (closedOnCompletion) {
>             // add on completion so we can cleanup after the exchange is
> done such as deleting temporary files
>             exchange.addOnCompletion(new SynchronizationAdapter() {
>                 @Override
>                 public void onDone(Exchange exchange) {
>                     try {
>                         if (fileInputStreamCache != null) {
>                             fileInputStreamCache.close();
>                         }
>                         close();                    } catch (Exception e) {
>                         LOG.warn("Error deleting temporary cache file: " +
> tempFile, e);
>                     }
>                 }
>
>                 @Override
>                 public String toString() {
>                     return "OnCompletion[CachedOutputStream]";
>                 }
>             });
>         }
>     }
>
>    public void close() throws IOException {
>         currentStream.close();
>         cleanUpTempFile();
>     }
>
> *
> <http://camel.465427.n5.nabble.com/file/n5756092/StreamCache_File_Gets_Deleted_before_Aggregation.png>
>
> I was able to circumvent the issue, if I try to set closedOnCompletion=
> false, while writing to CachedOutputStream in any component in any Multicast
> branch.
> But this is a leaky solution, because the streamcache temporary file(s) may
> then never get cleaned up.
>
> Can the MulticastProcessor be adjusted so that the multicast branch
> exchanges reach 'completion' status only, after they have been aggregated at
> the end of multicast?
>
> Please help / advise on the issue, as I am new to using camel Multicast.
>
> Thanks,
> Lakshmi
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Stream-Cache-spool-file-deletion-before-aggregation-in-Multicast-involving-huge-data-tp5756092.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Mime
View raw message