camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [7/7] camel git commit: Correctly support the LATEST shard iterator type by starting with the last shard in the stream descrption.
Date Wed, 16 Dec 2015 13:22:52 GMT
Correctly support the LATEST shard iterator type by starting with the last shard in the stream
descrption.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3b86b97
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3b86b97
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3b86b97

Branch: refs/heads/master
Commit: e3b86b977bbcda16d35c936dab77ed7b07f90e6e
Parents: 7180893
Author: Candle <candle@candle.me.uk>
Authored: Wed Dec 16 10:34:21 2015 +0000
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Dec 16 14:19:13 2015 +0100

----------------------------------------------------------------------
 .../component/aws/ddbstream/DdbStreamConsumer.java  | 10 +++++++++-
 .../component/aws/ddbstream/DdbStreamEndpoint.java  | 16 +++++++++++++++-
 .../camel/component/aws/ddbstream/ShardList.java    | 13 +++++++++++++
 .../component/aws/ddbstream/ShardListTest.java      |  8 ++++++++
 4 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 0a6a83c..f5223c0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -107,7 +107,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer
{
 
             LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
             if (currentShard == null) {
-                currentShard = shardList.first();
+                switch(getEndpoint().getIteratorType()) {
+                case TRIM_HORIZON:
+                    currentShard = shardList.first();
+                    break;
+                default:
+                case LATEST:
+                    currentShard = shardList.last();
+                    break;
+                }
             } else {
                 currentShard = shardList.nextAfter(currentShard);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 66a7461..543c432 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -29,7 +29,9 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
-@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams", consumerOnly = true,
syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
+@UriEndpoint(scheme = "aws-ddbstream", title = "AWS DynamoDB Streams",
+        consumerOnly = true, syntax = "aws-ddbstream:tableName",
+        consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams")
 public class DdbStreamEndpoint extends ScheduledPollEndpoint {
 
     @UriPath(label = "consumer", description = "Name of the dynamodb table")
@@ -56,6 +58,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
     // This can be done by having the type of the parameter an interface
     // and supplying a default implementation and a converter from a long/String
     // to an instance of this interface.
+    // Note that the shard list needs to have the ability to start at the shard
+    // that includes the supplied sequence number
 
     public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component)
{
         super(uri, component);
@@ -86,6 +90,16 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
         return true;
     }
 
+    @Override
+    public String toString() {
+        return "DdbStreamEndpoint{"
+                + "tableName=" + tableName
+                + ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
+                + ", iteratorType="
+                + iteratorType + ", uri=" + getEndpointUri()
+                + '}';
+    }
+
     AmazonDynamoDBStreams getClient() {
         return amazonDynamoDbStreamsClient;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 6e804f5..a0df179 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -57,6 +57,19 @@ class ShardList {
         throw new IllegalStateException("Unable to find an unparented shard in " + shards);
     }
 
+    Shard last() {
+        Map<String, Shard> shardsByParent = new HashMap<>();
+        for (Shard shard : shards.values()) {
+            shardsByParent.put(shard.getParentShardId(), shard);
+        }
+        for (Shard shard : shards.values()) {
+            if (!shardsByParent.containsKey(shard.getShardId())) {
+                return shard;
+            }
+        }
+        throw new IllegalStateException("Unable to find a shard with no children " + shards);
+    }
+
     /**
      * Removes shards that are older than the provided shard.
      * Does not remove the provided shard.

http://git-wip-us.apache.org/repos/asf/camel/blob/e3b86b97/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 60f3d46..1b7249a 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -102,6 +102,14 @@ public class ShardListTest {
     }
 
     @Test
+    public void lastShardGetsTheShardWithNoChildren() throws Exception {
+        ShardList shards = new ShardList();
+        shards.addAll(createShards("a", "b", "c", "d"));
+
+        assertThat(shards.last().getShardId(), is("d"));
+    }
+
+    @Test
     public void removingShards() throws Exception {
         ShardList shards = new ShardList();
         shards.addAll(createShards(null, "a", "b", "c", "d"));


Mime
View raw message