camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [3/3] camel git commit: CAMEL-11698: Make s3 consumer auto close s3objects if an exception was thrown during creating exchanges/batch to avoid leaking resources. Thanks to Mykhailo Vlakh for reporting.
Date Tue, 12 Sep 2017 07:51:02 GMT
CAMEL-11698: Make s3 consumer auto close s3objects if an exception was thrown during creating
exchanges/batch to avoid leaking resources. Thanks to Mykhailo Vlakh for reporting.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ccb8d4e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ccb8d4e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ccb8d4e6

Branch: refs/heads/camel-2.18.x
Commit: ccb8d4e60068fd607bfacc0eb2312557818f968a
Parents: fd6fb19
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Sep 12 09:39:55 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Sep 12 09:49:04 2017 +0200

----------------------------------------------------------------------
 .../camel/component/aws/s3/S3Consumer.java      | 24 ++++++++++++++++----
 .../camel/component/aws/s3/S3Endpoint.java      | 11 +++------
 2 files changed, 22 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ccb8d4e6/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index 5fb4936..7483ecd 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws.s3;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -35,6 +37,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
@@ -112,12 +115,23 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Received {} messages in this poll", s3ObjectSummaries.size());
         }
-        
+
+        Collection<S3Object> s3Objects = new ArrayList<>();
         Queue<Exchange> answer = new LinkedList<Exchange>();
-        for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
-            S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(),
s3ObjectSummary.getKey());
-            Exchange exchange = getEndpoint().createExchange(s3Object);
-            answer.add(exchange);
+        try {
+            for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) {
+                S3Object s3Object = getAmazonS3Client().getObject(s3ObjectSummary.getBucketName(),
s3ObjectSummary.getKey());
+                s3Objects.add(s3Object);
+
+                Exchange exchange = getEndpoint().createExchange(s3Object);
+                answer.add(exchange);
+            }
+        } catch (Throwable e) {
+            LOG.warn("Error getting S3Object due: " + e.getMessage(), e);
+            // ensure all previous gathered s3 objects are closed
+            // if there was an exception creating the exchanges in this batch
+            s3Objects.forEach(IOHelper::close);
+            throw e;
         }
 
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/ccb8d4e6/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
index 05a61f4..86197a8 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Endpoint.java
@@ -44,6 +44,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 
 import org.slf4j.Logger;
@@ -188,19 +189,13 @@ public class S3Endpoint extends ScheduledPollEndpoint {
          * As of 2.17, the consumer does not close the stream or object on commit.
          */
         if (!configuration.isIncludeBody()) {
-            try {
-                s3Object.close();
-            } catch (IOException e) {
-            }
+            IOHelper.close(s3Object);
         } else {
             if (configuration.isAutocloseBody()) {
                 exchange.addOnCompletion(new SynchronizationAdapter() {
                     @Override
                     public void onDone(Exchange exchange) {
-                        try {
-                            s3Object.close();
-                        } catch (IOException e) {
-                        }
+                        IOHelper.close(s3Object);
                     }
                 });
             }


Mime
View raw message