camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [1/2] git commit: CAMEL-7654 Supports Message Attribuets with HeaderFilterStrategy in AWS SQS component
Date Wed, 13 Aug 2014 09:50:39 GMT
Repository: camel
Updated Branches:
  refs/heads/master e7c9e405a -> 1e9878801


CAMEL-7654 Supports Message Attribuets with HeaderFilterStrategy in AWS SQS component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1e987880
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1e987880
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1e987880

Branch: refs/heads/master
Commit: 1e98788019bc1b4ebdd26fcbaa248ffa7a9715df
Parents: 2ca8187
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Wed Aug 13 17:44:15 2014 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Wed Aug 13 17:49:50 2014 +0800

----------------------------------------------------------------------
 .../camel/component/aws/sqs/SqsEndpoint.java    | 27 ++++++++++++++--
 .../aws/sqs/SqsHeaderFilterStrategy.java        | 30 ++++++++++++++++++
 .../camel/component/aws/sqs/SqsProducer.java    | 33 +++++++++++++-------
 .../component/aws/sqs/SqsProducerTest.java      |  8 +++--
 4 files changed, 81 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index b48fbe6..8ed85c5 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -31,7 +31,6 @@ import com.amazonaws.services.sqs.model.ListQueuesResult;
 import com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.QueueAttributeName;
 import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -40,6 +39,8 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ScheduledPollEndpoint;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory;
  * Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>.
 
  *
  */
-public class SqsEndpoint extends ScheduledPollEndpoint {
+public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware
{
     
     private static final Logger LOG = LoggerFactory.getLogger(SqsEndpoint.class);
     
@@ -57,12 +58,21 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
     private String queueUrl;
     private SqsConfiguration configuration;
     private int maxMessagesPerPoll;
+    private HeaderFilterStrategy headerFilterStrategy;
 
     public SqsEndpoint(String uri, SqsComponent component, SqsConfiguration configuration)
{
         super(uri, component);
         this.configuration = configuration;
     }
+    
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
 
+    public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+        this.headerFilterStrategy = strategy;
+    }
+   
     public Producer createProducer() throws Exception {
         return new SqsProducer(this);
     }
@@ -87,6 +97,11 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
         if (ObjectHelper.isNotEmpty(getConfiguration().getAmazonSQSEndpoint())) {
             client.setEndpoint(getConfiguration().getAmazonSQSEndpoint());
         }
+        
+        // check the setting the headerFilterStrategy
+        if (headerFilterStrategy == null) {
+            headerFilterStrategy = new SqsHeaderFilterStrategy();
+        }
 
         // If both region and Account ID is provided the queue URL can be built manually.
         // This allows accessing queues where you don't have permission to list queues or
query queues
@@ -191,10 +206,16 @@ public class SqsEndpoint extends ScheduledPollEndpoint {
         message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
         message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes());
         
+        //Need to apply the SqsHeaderFilterStrategy this time
+        HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
         //add all sqs message attributes as camel message headers so that knowledge of 
         //the Sqs class MessageAttributeValue will not leak to the client
         for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet())
{
-            message.setHeader(entry.getKey(), translateValue(entry.getValue()));
+            String header = entry.getKey();
+            Object value = translateValue(entry.getValue());
+            if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange))
{
+                message.setHeader(header, value);
+            }
         }
         return exchange;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
new file mode 100644
index 0000000..fb5f425
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class SqsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+    public SqsHeaderFilterStrategy() {
+        initialize();  
+    }
+
+    protected void initialize() {
+        // filter headers begin with "Camel" or "org.apache.camel"
+        setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); 
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 0a4c948..874e021 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -25,11 +25,11 @@ import com.amazonaws.services.sqs.AmazonSQS;
 import com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoFactoryAvailableException;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class SqsProducer extends DefaultProducer {
     public void process(Exchange exchange) throws Exception {
         String body = exchange.getIn().getBody(String.class);
         SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body);
-        request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders()));
+        request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange));
         addDelay(request, exchange);
 
         LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange);
@@ -106,19 +106,28 @@ public class SqsProducer extends DefaultProducer {
         return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) +
"]";
     }
     
-    private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object>
headers) {
+    private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object>
headers, Exchange exchange) {
         Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>();
+        HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
         for (Entry<String, Object> entry : headers.entrySet()) {
-            Object value = entry.getValue();
-            MessageAttributeValue mav = new MessageAttributeValue();
-            if (value instanceof String) {
-                mav.setDataType("String");
-                mav.withStringValue((String)value);
-            } else if (value instanceof ByteBuffer) {
-                mav.setDataType("Binary");
-                mav.withBinaryValue((ByteBuffer)value);
+            // only put the message header which is not filtered into the message attribute
+            if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(),
exchange)) {
+                Object value = entry.getValue();
+                if (value instanceof String) {
+                    MessageAttributeValue mav = new MessageAttributeValue();
+                    mav.setDataType("String");
+                    mav.withStringValue((String)value);
+                    result.put(entry.getKey(), mav);
+                } else if (value instanceof ByteBuffer) {
+                    MessageAttributeValue mav = new MessageAttributeValue();
+                    mav.setDataType("Binary");
+                    mav.withBinaryValue((ByteBuffer)value);
+                    result.put(entry.getKey(), mav);
+                } else {
+                    // cannot translate the message header to message attribute value
+                    LOG.warn("Cannot put the message header key={0}, value={1} into Sqs MessageAttribute",
entry.getKey(), entry.getValue());
+                }
             }
-            result.put(entry.getKey(), mav);
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index e6742ca..bed350c 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -23,17 +23,16 @@ import java.util.Map;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
@@ -54,6 +53,8 @@ public class SqsProducerTest {
     private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]);
     private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3";
     private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3";
+    private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1";
+    private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue";
     
     Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
 
@@ -72,6 +73,7 @@ public class SqsProducerTest {
         underTest = new SqsProducer(sqsEndpoint);
         sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID);
         sqsConfiguration = new SqsConfiguration();
+        HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy();
         sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
         when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
         when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
@@ -81,6 +83,7 @@ public class SqsProducerTest {
         when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
         when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
         when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
+        when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
     }
 
     @Test
@@ -173,6 +176,7 @@ public class SqsProducerTest {
         headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1);
         headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2);
         headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3);
+        headers.put(SAMPLE_MESSAGE_HEADER_NAME_4, SAMPLE_MESSAGE_HEADER_VALUE_4);
         when(inMessage.getHeaders()).thenReturn(headers);
         underTest.process(exchange);
 


Mime
View raw message