Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9DE618B0A for ; Wed, 16 Dec 2015 13:22:50 +0000 (UTC) Received: (qmail 37727 invoked by uid 500); 16 Dec 2015 13:22:50 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 37594 invoked by uid 500); 16 Dec 2015 13:22:50 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 37473 invoked by uid 99); 16 Dec 2015 13:22:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2015 13:22:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B321CE099B; Wed, 16 Dec 2015 13:22:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 16 Dec 2015 13:22:47 -0000 Message-Id: <6333bb05c14d4420bbccdd21c1757046@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/7] camel git commit: Added basic DynamoDb Stream component. Added basic DynamoDb Stream component. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/78fd81e5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/78fd81e5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/78fd81e5 Branch: refs/heads/master Commit: 78fd81e5f7861e6fbde3fb9d63519bc1e775c93c Parents: 3da84a6 Author: Candle Authored: Mon Dec 7 09:33:10 2015 +0000 Committer: Claus Ibsen Committed: Wed Dec 16 14:19:11 2015 +0100 ---------------------------------------------------------------------- .../aws/ddbstream/DdbStreamComponent.java | 44 +++++++ .../aws/ddbstream/DdbStreamConsumer.java | 128 +++++++++++++++++++ .../aws/ddbstream/DdbStreamEndpoint.java | 117 +++++++++++++++++ .../org/apache/camel/component/aws-ddbstream | 18 +++ 4 files changed, 307 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java new file mode 100644 index 0000000..559597b --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DdbStreamComponent extends UriEndpointComponent { + private static final Logger LOG = LoggerFactory.getLogger(DdbStreamComponent.class); + + public DdbStreamComponent() { + super(DdbStreamEndpoint.class); + } + + public DdbStreamComponent(CamelContext context) { + super(context, DdbStreamEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + DdbStreamEndpoint endpoint = new DdbStreamEndpoint(uri, remaining, this); + + LOG.debug("Created endpoint: {}", endpoint.toString()); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java new file mode 100644 index 0000000..88d2ba5 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; +import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; +import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; +import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; +import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; +import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest; +import com.amazonaws.services.dynamodbv2.model.ListStreamsResult; +import com.amazonaws.services.dynamodbv2.model.Record; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DdbStreamConsumer extends ScheduledBatchPollingConsumer { + private static final Logger LOG = LoggerFactory.getLogger(DdbStreamConsumer.class); + + private String currentShardIterator = null; + + public DdbStreamConsumer(DdbStreamEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + /* + * Returns the number of messages polled. + */ + @Override + protected int poll() throws Exception { + GetRecordsRequest req = new GetRecordsRequest() + .withShardIterator(getShardItertor()) + .withLimit(getEndpoint().getMaxResultsPerRequest()) + ; + GetRecordsResult result = getClient().getRecords(req); + + Queue exchanges = createExchanges(result.getRecords()); + int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); + + currentShardIterator = result.getNextShardIterator(); + + return processedExchangeCount; + } + + @Override + public int processBatch(Queue exchanges) throws Exception { + int processedExchanges = 0; + while (!exchanges.isEmpty()) { + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + + LOG.trace("Processing exchange [{}] started.", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Processing exchange [{}] done.", exchange); + } + }); + processedExchanges++; + } + return processedExchanges; + } + + private AmazonDynamoDBStreams getClient() { + return getEndpoint().getClient(); + } + + @Override + public DdbStreamEndpoint getEndpoint() { + return (DdbStreamEndpoint) super.getEndpoint(); + } + + private String getShardItertor() { + // either return a cached one or get a new one via a GetShardIterator request. + if (currentShardIterator == null) { + ListStreamsRequest req0 = new ListStreamsRequest() + .withTableName(getEndpoint().getTableName()) + ; + ListStreamsResult res0 = getClient().listStreams(req0); + final String streamArn = res0.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream + DescribeStreamRequest req1 = new DescribeStreamRequest() + .withStreamArn(streamArn) + ; + DescribeStreamResult res1 = getClient().describeStream(req1); + + GetShardIteratorRequest req = new GetShardIteratorRequest() + .withStreamArn(streamArn) + .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard + .withShardIteratorType(getEndpoint().getIteratorType()) + ; + GetShardIteratorResult result = getClient().getShardIterator(req); + currentShardIterator = result.getShardIterator(); + } + LOG.trace("Shard Iterator is: {}", currentShardIterator); + return currentShardIterator; + } + + private Queue createExchanges(List records) { + Queue exchanges = new ArrayDeque<>(); + for (Record record : records) { + exchanges.add(getEndpoint().createExchange(record)); + } + return exchanges; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java new file mode 100644 index 0000000..18c042d --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.ddbstream; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; +import com.amazonaws.services.dynamodbv2.model.Record; +import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; + +@UriEndpoint(scheme = "aws-ddbstream", title = "AWS Kinesis", syntax = "aws-ddbstream:tableName", consumerClass = DdbStreamConsumer.class, label = "cloud,messaging,streams") +public class DdbStreamEndpoint extends ScheduledPollEndpoint { + + @UriPath(label = "consumer,producer", description = "Name of the dynamodb table") + @Metadata(required = "true") + private String tableName; + + // For now, always assume that we've been supplied a client in the Camel registry. + @UriParam(label = "consumer", description = "Amazon DynamoDB client to use for all requests for this endpoint") + @Metadata(required = "true") + private AmazonDynamoDBStreams amazonDynamoDbStreamsClient; + + @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll") + private int maxResultsPerRequest = 1; + + @UriParam(label = "consumer", description = "Defines where in the DynaboDB stream to start getting records") + private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + + public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) { + super(uri, component); + this.tableName = tableName; + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + DdbStreamConsumer consumer = new DdbStreamConsumer(this, processor); + consumer.setSchedulerProperties(consumer.getEndpoint().getSchedulerProperties()); + return consumer; + } + + Exchange createExchange(Record record) { + Exchange ex = super.createExchange(); + ex.getIn().setBody(record, Record.class); + + return ex; + } + + @Override + public boolean isSingleton() { + // probably right. + return true; + } + + AmazonDynamoDBStreams getClient() { + return amazonDynamoDbStreamsClient; + } + + // required for injection. + public AmazonDynamoDBStreams getAmazonDynamoDBStreamsClient() { + return amazonDynamoDbStreamsClient; + } + + public void setAmazonDynamoDbStreamsClient(AmazonDynamoDBStreams amazonDynamoDbStreamsClient) { + this.amazonDynamoDbStreamsClient = amazonDynamoDbStreamsClient; + } + + public int getMaxResultsPerRequest() { + return maxResultsPerRequest; + } + + public void setMaxResultsPerRequest(int maxResultsPerRequest) { + this.maxResultsPerRequest = maxResultsPerRequest; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public ShardIteratorType getIteratorType() { + return iteratorType; + } + + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/78fd81e5/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream new file mode 100644 index 0000000..48a8509 --- /dev/null +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-ddbstream @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +class=org.apache.camel.component.aws.ddbstream.DdbStreamComponent