camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only poll data one time and then never anymore.
Date Thu, 28 Apr 2016 07:05:51 GMT
CAMEL-9784: aws s3 consumer should keep polling if deleteAfterRead is false, otherwise it only
poll data one time and then never anymore.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f16e397
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f16e397
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f16e397

Branch: refs/heads/master
Commit: 9f16e397cb8823f9317bd08a7f0f87ee05e1d5b8
Parents: 3b4b522
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Thu Apr 28 09:02:28 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Apr 28 09:05:44 2016 +0200

----------------------------------------------------------------------
 components/camel-aws/src/main/docs/aws-s3.adoc  |  4 +-
 .../camel/component/aws/s3/S3Configuration.java | 12 ++--
 .../camel/component/aws/s3/S3Consumer.java      | 72 +++++++++-----------
 3 files changed, 42 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/docs/aws-s3.adoc
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/docs/aws-s3.adoc b/components/camel-aws/src/main/docs/aws-s3.adoc
index 6bf206a..e291580 100644
--- a/components/camel-aws/src/main/docs/aws-s3.adoc
+++ b/components/camel-aws/src/main/docs/aws-s3.adoc
@@ -38,6 +38,7 @@ The AWS S3 Storage Service component has no options.
 
 
 
+
 // endpoint options: START
 The AWS S3 Storage Service component supports 38 endpoint options which are listed below:
 
@@ -53,7 +54,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which
are list
 | proxyPort | common |  | Integer | Camel 2.16: Specify a proxy port to be used inside the
client definition.
 | secretKey | common |  | String | Amazon AWS Secret Key
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the
Camel routing Error Handler which mean any exceptions occurred while the consumer is trying
to pickup incoming messages or the likes will now be processed as a message and handled by
the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler
to deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been
retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs
the object is not deleted.
+| deleteAfterRead | consumer | true | boolean | Delete objects from S3 after they have been
retrieved. The delete is only performed if the Exchange is committed. If a rollback occurs
the object is not deleted. If this option is false then the same objects will be retrieve
over and over again on the polls. Therefore you need to use the Idempotent Consumer EIP in
the route to filter out duplicates. You can filter using the link S3ConstantsBUCKET_NAME and
link S3ConstantsKEY headers or only the link S3ConstantsKEY header.
 | fileName | consumer |  | String | To get the object from the bucket with the given file
name
 | includeBody | consumer | true | boolean | Camel 2.17: If it is true the exchange body will
be set to a stream to the contents of the file. If false the headers will be set with the
S3 object metadata but the body will be null.
 | maxMessagesPerPoll | consumer | 10 | int | Gets the maximum number of messages as a limit
to poll at each polling. Is default unlimited but use 0 or negative number to disable it as
unlimited.
@@ -88,6 +89,7 @@ The AWS S3 Storage Service component supports 38 endpoint options which
are list
 
 
 
+
 |=======================================================================
 
 Required S3 component options

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
index 1057763..83c33d2 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Configuration.java
@@ -171,10 +171,6 @@ public class S3Configuration implements Cloneable {
         this.region = region;
     }
 
-    public boolean isDeleteAfterRead() {
-        return deleteAfterRead;
-    }
-
     /**
      * *Camel 2.17*: If it is true, the exchange body will be set to a stream to the contents
of the file.
      * If false, the headers will be set with the S3 object metadata, but the body will be
null.
@@ -187,9 +183,17 @@ public class S3Configuration implements Cloneable {
         return includeBody;
     }
 
+    public boolean isDeleteAfterRead() {
+        return deleteAfterRead;
+    }
+
     /**
      * Delete objects from S3 after they have been retrieved.  The delete is only performed
if the Exchange is committed.
      * If a rollback occurs, the object is not deleted.
+     * <p/>
+     * If this option is false, then the same objects will be retrieve over and over again
on the polls. Therefore you
+     * need to use the Idempotent Consumer EIP in the route to filter out duplicates. You
can filter using the
+     * {@link S3Constants#BUCKET_NAME} and {@link S3Constants#KEY} headers, or only the {@link
S3Constants#KEY} header.
      */
     public void setDeleteAfterRead(boolean deleteAfterRead) {
         this.deleteAfterRead = deleteAfterRead;

http://git-wip-us.apache.org/repos/asf/camel/blob/9f16e397/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
index eab0508..5fb4936 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/s3/S3Consumer.java
@@ -43,13 +43,11 @@ import org.slf4j.LoggerFactory;
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service
  * <a href="http://aws.amazon.com/s3/">AWS S3</a>
- * 
  */
 public class S3Consumer extends ScheduledBatchPollingConsumer {
     
     private static final Logger LOG = LoggerFactory.getLogger(S3Consumer.class);
     private String marker;
-    private boolean filesConsumed;
     private transient String s3ConsumerToString;
 
     public S3Consumer(S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException
{
@@ -66,48 +64,40 @@ public class S3Consumer extends ScheduledBatchPollingConsumer {
         String bucketName = getConfiguration().getBucketName();
         Queue<Exchange> exchanges;
         
-        if (filesConsumed) {
-            exchanges = new LinkedList<Exchange>();
+        if (fileName != null) {
+            LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName,
fileName);
+
+            S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName,
fileName));
+            exchanges = createExchanges(s3Object);
         } else {
-            if (fileName != null) {
-                LOG.trace("Getting object in bucket [{}] with file name [{}]...", bucketName,
fileName);
-    
-                S3Object s3Object = getAmazonS3Client().getObject(new GetObjectRequest(bucketName,
fileName));
-                exchanges = createExchanges(s3Object);
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    filesConsumed = true;
-                }
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
+            listObjectsRequest.setBucketName(bucketName);
+            listObjectsRequest.setPrefix(getConfiguration().getPrefix());
+            if (maxMessagesPerPoll > 0) {
+                listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
+            }
+            // if there was a marker from previous poll then use that to continue from where
we left last time
+            if (marker != null) {
+                LOG.trace("Resuming from marker: {}", marker);
+                listObjectsRequest.setMarker(marker);
+            }
+
+            ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
+            if (listObjects.isTruncated()) {
+                marker = listObjects.getNextMarker();
+                LOG.trace("Returned list is truncated, so setting next marker: {}", marker);
             } else {
-                LOG.trace("Queueing objects in bucket [{}]...", bucketName);
-            
-                ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
-                listObjectsRequest.setBucketName(bucketName);
-                listObjectsRequest.setPrefix(getConfiguration().getPrefix());
-                if (maxMessagesPerPoll > 0) {
-                    listObjectsRequest.setMaxKeys(maxMessagesPerPoll);
-                }
-                if (marker != null && !getConfiguration().isDeleteAfterRead()) {
-                    listObjectsRequest.setMarker(marker);
-                }
-            
-                ObjectListing listObjects = getAmazonS3Client().listObjects(listObjectsRequest);
-                // we only setup the marker if the file is not deleted
-                if (!getConfiguration().isDeleteAfterRead()) {
-                    // if the marker is truncated, the nextMarker should not be null
-                    if (listObjects.getNextMarker() != null) {
-                        marker = listObjects.getNextMarker();
-                    } else {
-                        // if there is no marker, the files are consumed, we should not pull
it again
-                        filesConsumed = true;
-                    }
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(),
bucketName);
-                }
-            
-                exchanges = createExchanges(listObjects.getObjectSummaries());
+                // no more data so clear marker
+                marker = null;
             }
-        }    
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.getObjectSummaries().size(),
bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.getObjectSummaries());
+        }
         return processBatch(CastUtils.cast(exchanges));
     }
     


Mime
View raw message