camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject [5/9] camel git commit: [CAMEL-9659] Add different strategies for handling the detection of new files
Date Tue, 01 Mar 2016 20:02:02 GMT
[CAMEL-9659] Add different strategies for handling the detection of new files


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45204104
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45204104
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45204104

Branch: refs/heads/master
Commit: 452041047e770c24e8802f2cac7cc76080554303
Parents: ce54b04
Author: Daniel Kulp <dkulp@apache.org>
Authored: Tue Mar 1 11:48:43 2016 -0500
Committer: Daniel Kulp <dkulp@apache.org>
Committed: Tue Mar 1 14:23:07 2016 -0500

----------------------------------------------------------------------
 .../camel/component/gridfs/GridFsConsumer.java  | 98 ++++++++++++++++----
 .../camel/component/gridfs/GridFsEndpoint.java  | 48 +++++++++-
 .../component/gridfs/GridFsConsumerTest.java    | 36 ++++++-
 3 files changed, 157 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java b/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java
index 240dd47..35d77ee 100644
--- a/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java
+++ b/src/main/java/org/apache/camel/component/gridfs/GridFsConsumer.java
@@ -23,13 +23,17 @@ import java.io.InputStream;
 import java.util.concurrent.ExecutorService;
 
 import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
 import com.mongodb.DBCursor;
 import com.mongodb.DBObject;
+import com.mongodb.MongoException;
 import com.mongodb.gridfs.GridFSDBFile;
 import com.mongodb.util.JSON;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.gridfs.GridFsEndpoint.QueryStrategy;
 import org.apache.camel.impl.DefaultConsumer;
 
 /**
@@ -48,8 +52,6 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable
{
         this.endpoint = endpoint;
     }
 
-    
-
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -69,7 +71,38 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable
{
     @Override
     public void run() {
         DBCursor c = null;
-        java.util.Date fromDate = new java.util.Date();
+        java.util.Date fromDate = null;
+        
+        QueryStrategy s = endpoint.getQueryStrategy();
+        boolean usesTimestamp = (s != QueryStrategy.FileAttribute);
+        boolean persistsTimestamp = (s == QueryStrategy.PersistentTimestamp || s == QueryStrategy.PersistentTimestampAndFileAttribute);
+        boolean usesAttribute = (s == QueryStrategy.FileAttribute 
+            || s == QueryStrategy.TimeStampAndFileAttribute 
+            || s == QueryStrategy.PersistentTimestampAndFileAttribute);
+        
+        DBCollection ptsCollection = null;
+        DBObject persistentTimestamp = null;
+        if (persistsTimestamp) {
+            ptsCollection = endpoint.getDB().getCollection(endpoint.getPersistentTSCollection());
+         // ensure standard indexes as long as collections are small
+            try {
+                if (ptsCollection.count() < 1000) {
+                    ptsCollection.createIndex(new BasicDBObject("id", 1));
+                }
+            } catch (MongoException e) {
+                //TODO: Logging
+            }
+            persistentTimestamp = ptsCollection.findOne(new BasicDBObject("id", endpoint.getPersistentTSObject()));
+            if (persistentTimestamp == null) {
+                persistentTimestamp = new BasicDBObject("id", endpoint.getPersistentTSObject());
+                fromDate = new java.util.Date();
+                persistentTimestamp.put("timestamp", fromDate);
+                ptsCollection.save(persistentTimestamp);
+            }
+            fromDate = (java.util.Date)persistentTimestamp.get("timestamp");
+        } else if (usesTimestamp) {
+            fromDate = new java.util.Date();
+        }
         try {
             Thread.sleep(endpoint.getInitialDelay());
             while (isStarted()) {                
@@ -84,27 +117,54 @@ public class GridFsConsumer extends DefaultConsumer implements Runnable
{
                     } else {
                         query = (DBObject) JSON.parse(queryString);
                     }
-                    
-                    query.put("uploadDate", new BasicDBObject("$gte", fromDate));
+                    if (usesTimestamp) {
+                        query.put("uploadDate", new BasicDBObject("$gt", fromDate));
+                    }
+                    if (usesAttribute) {
+                        query.put(endpoint.getFileAttributeName(), null);
+                    }
                     c = endpoint.getFilesCollection().find(query);
-                    fromDate = new java.util.Date();
                 }
+                boolean dateModified = false;
                 while (c.hasNext() && isStarted()) {
                     GridFSDBFile file = (GridFSDBFile)c.next();
-                    file = endpoint.getGridFs().findOne(new BasicDBObject("_id", file.getId()));
-                    
-                    Exchange exchange = endpoint.createExchange();
-                    exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, JSON.serialize(file.getMetaData()));
-                    exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, file.getContentType());
-                    exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength());
-                    exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate());
-                    exchange.getIn().setBody(file.getInputStream(), InputStream.class);
-                    try {
-                        getProcessor().process(exchange);
-                    } catch (Exception e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
+                    GridFSDBFile forig = file;
+                    if (usesAttribute) {
+                        file.put(endpoint.getFileAttributeName(), "processing");
+                        DBObject q = BasicDBObjectBuilder.start("_id", file.getId()).append("camel-processed",
null).get();
+                        forig = (GridFSDBFile)endpoint.getFilesCollection().findAndModify(q,
null, null, false, file, true, false);
                     }
+                    if (forig != null) {
+                        file = endpoint.getGridFs().findOne(new BasicDBObject("_id", file.getId()));
+                        
+                        Exchange exchange = endpoint.createExchange();
+                        exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, JSON.serialize(file.getMetaData()));
+                        exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, file.getContentType());
+                        exchange.getIn().setHeader(Exchange.FILE_LENGTH, file.getLength());
+                        exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate());
+                        exchange.getIn().setBody(file.getInputStream(), InputStream.class);
+                        try {
+                            getProcessor().process(exchange);
+                            //System.out.println("Processing " + file.getFilename());
+                            if (usesAttribute) {
+                                forig.put(endpoint.getFileAttributeName(), "done");
+                                endpoint.getFilesCollection().save(forig);
+                            }
+                            if (usesTimestamp) {
+                                if (file.getUploadDate().compareTo(fromDate) > 0) {
+                                    fromDate = file.getUploadDate();
+                                    dateModified = true;
+                                }
+                            }
+                        } catch (Exception e) {
+                            // TODO Auto-generated catch block
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                if (persistsTimestamp && dateModified) {
+                    persistentTimestamp.put("timestamp", fromDate);
+                    ptsCollection.save(persistentTimestamp);
                 }
                 Thread.sleep(endpoint.getDelay());
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java b/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java
index 008e004..554c4cd 100644
--- a/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java
+++ b/src/main/java/org/apache/camel/component/gridfs/GridFsEndpoint.java
@@ -36,6 +36,14 @@ import org.slf4j.LoggerFactory;
 @UriEndpoint(scheme = "gridfs", title = "MongoDBGridFS", syntax = "gridfs:connectionBean",

             label = "database,nosql")
 public class GridFsEndpoint extends DefaultEndpoint {
+    
+    public enum QueryStrategy {
+        TimeStamp,
+        PersistentTimestamp,
+        FileAttribute,
+        TimeStampAndFileAttribute,
+        PersistentTimestampAndFileAttribute
+    };
     public static final String GRIDFS_OPERATION = "gridfs.operation";
     public static final String GRIDFS_METADATA = "gridfs.metadata";
     public static final String GRIDFS_CHUNKSIZE = "gridfs.chunksize";
@@ -64,7 +72,16 @@ public class GridFsEndpoint extends DefaultEndpoint {
     @UriParam
     private long delay = 500;
     
-    
+    @UriParam 
+    private QueryStrategy queryStrategy = QueryStrategy.TimeStamp;
+    @UriParam
+    private String persistentTSCollection = "camel-timestamps";
+    @UriParam
+    private String persistentTSObject = "camel-timestamp";
+    @UriParam
+    private String fileAttributeName = "camel-processed";
+
+
     private Mongo mongoConnection;
     private DB db;
     private GridFS gridFs;
@@ -154,6 +171,10 @@ public class GridFsEndpoint extends DefaultEndpoint {
         this.mongoConnection = mongoConnection;
     }
 
+    public DB getDB() {
+        return db;
+    }
+    
     public String getDatabase() {
         return database;
     }
@@ -186,6 +207,31 @@ public class GridFsEndpoint extends DefaultEndpoint {
         this.initialDelay = delay;
     }
     
+    public void setQueryStrategy(String s) {
+        queryStrategy = QueryStrategy.valueOf(s);
+    }
+    public QueryStrategy getQueryStrategy() {
+        return queryStrategy;
+    }
+    public void setPersistentTSCollection(String s) {
+        persistentTSCollection = s;
+    }
+    public String getPersistentTSCollection() {
+        return persistentTSCollection;
+    }
+    public void setPersistentTSObject(String s) {
+        persistentTSObject = s;
+    }
+    public String getPersistentTSObject() {
+        return persistentTSObject;
+    }
+    public void setFileAttributeName(String f) {
+        fileAttributeName = f;
+    }
+    public String getFileAttributeName() {
+        return fileAttributeName;
+    }   
+    
     /**
      * Set the {@link WriteConcern} for write operations on MongoDB using the standard ones.
      * Resolved from the fields of the WriteConcern class by calling the {@link WriteConcern#valueOf(String)}
method.

http://git-wip-us.apache.org/repos/asf/camel/blob/45204104/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java b/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java
index a84260c..77b1c6e 100644
--- a/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java
+++ b/src/test/java/org/apache/camel/component/gridfs/GridFsConsumerTest.java
@@ -22,6 +22,8 @@ package org.apache.camel.component.gridfs;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.mongodb.gridfs.GridFS;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -37,14 +39,36 @@ public class GridFsConsumerTest extends AbstractMongoDbTest {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:create").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket="
+ getBucket());
+                from("direct:create-a").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket="
+ getBucket() + "-a");
+                from("direct:create-pts").to("gridfs:myDb?database={{mongodb.testDb}}&operation=create&bucket="
+ getBucket() + "-pts");
+                
                 from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket()).convertBodyTo(String.class).to("mock:test");
+                from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket()
+ "-a&queryStrategy=FileAttribute")
+                    .convertBodyTo(String.class).to("mock:test");
+                from("gridfs:myDb?database={{mongodb.testDb}}&bucket=" + getBucket()
+ "-pts&queryStrategy=PersistentTimestamp")
+                    .convertBodyTo(String.class).to("mock:test");
             }
         };
     }
     
     
     @Test
-    public void test() throws Exception {
+    public void testTimestamp() throws Exception {
+        runTest("direct:create", gridfs);
+    }
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testAttribute() throws Exception {
+        runTest("direct:create-a", new GridFS(mongo.getDB("test"), getBucket() + "-a"));
+    }
+    
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testPersistentTS() throws Exception {
+        runTest("direct:create-pts", new GridFS(mongo.getDB("test"), getBucket() + "-pts"));
+    }
+    
+    public void runTest(String target, GridFS gridfs) throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:test");
         String data = "This is some stuff to go into the db";
         mock.expectedMessageCount(1);
@@ -55,7 +79,7 @@ public class GridFsConsumerTest extends AbstractMongoDbTest {
         assertEquals(0, gridfs.find(fn).size());
         
         headers.put(Exchange.FILE_NAME, fn);
-        template.requestBodyAndHeaders("direct:create", data, headers);
+        template.requestBodyAndHeaders(target, data, headers);
         
         mock.assertIsSatisfied();
         mock.reset();
@@ -64,11 +88,13 @@ public class GridFsConsumerTest extends AbstractMongoDbTest {
         mock.expectedBodiesReceived(data, data, data);
         
         headers.put(Exchange.FILE_NAME, fn + "_1");
-        template.requestBodyAndHeaders("direct:create", data, headers);
+        template.requestBodyAndHeaders(target, data, headers);
         headers.put(Exchange.FILE_NAME, fn + "_2");
-        template.requestBodyAndHeaders("direct:create", data, headers);
+        template.requestBodyAndHeaders(target, data, headers);
         headers.put(Exchange.FILE_NAME, fn + "_3");
-        template.requestBodyAndHeaders("direct:create", data, headers);
+        template.requestBodyAndHeaders(target, data, headers);
+        mock.assertIsSatisfied();
+        Thread.sleep(1000);
         mock.assertIsSatisfied();
     }
 


Mime
View raw message