camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: Support PushTopic replay extension
Date Tue, 21 Jun 2016 06:43:28 GMT
Support PushTopic replay extension

See Salesforce API documentation here: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/replay_pushtopic_events.htm

Signed-off-by: Sune Keller <absukl@almbrand.dk>


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/68254a0b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/68254a0b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/68254a0b

Branch: refs/heads/master
Commit: 68254a0bc8c004d598dbecaaae3b7401b72613be
Parents: 615bc23
Author: Sune Keller <absukl@almbrand.dk>
Authored: Wed Jun 15 10:26:54 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Jun 21 08:40:03 2016 +0200

----------------------------------------------------------------------
 .../src/main/docs/salesforce.adoc               |  8 +-
 .../salesforce/SalesforceComponent.java         |  4 +-
 .../salesforce/SalesforceConsumer.java          |  5 ++
 .../salesforce/SalesforceEndpoint.java          |  2 +-
 .../salesforce/SalesforceEndpointConfig.java    | 39 ++++++++-
 .../salesforce/api/dto/SObjectUrls.java         |  9 ++
 .../streaming/CometDReplayExtension.java        | 88 ++++++++++++++++++++
 .../internal/streaming/SubscriptionHelper.java  | 42 +++++++++-
 8 files changed, 184 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc
b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc
index 614b537..3f5c9cd 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce.adoc
@@ -230,10 +230,8 @@ The Salesforce component supports 16 options which are listed below.
 
 
 
-
-
 // endpoint options: START
-The Salesforce component supports 37 endpoint options which are listed below:
+The Salesforce component supports 39 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -247,9 +245,11 @@ The Salesforce component supports 37 endpoint options which are listed
below:
 | apiVersion | common |  | String | Salesforce API version defaults to SalesforceEndpointConfig.DEFAULT_VERSION
 | batchId | common |  | String | Bulk API Batch ID
 | contentType | common |  | ContentType | Bulk API content type one of XML CSV ZIP_XML ZIP_CSV
+| defaultReplayId | common |  | Integer | Default replayId setting if no value is found in
link initialReplayIdMap
 | format | common |  | PayloadFormat | Payload format to use for Salesforce API calls either
JSON or XML defaults to JSON
 | httpClient | common |  | SalesforceHttpClient | Custom Jetty Http Client to use to connect
to Salesforce.
 | includeDetails | common |  | Boolean | Include details in Salesforce1 Analytics report
defaults to false.
+| initialReplayIdMap | common |  | Map | Replay IDs to start from per channel name.
 | instanceId | common |  | String | Salesforce1 Analytics report execution instance ID
 | jobId | common |  | String | Bulk API Job ID
 | notifyForFields | common |  | NotifyForFieldsEnum | Notify for fields options are ALL REFERENCED
SELECT WHERE
@@ -281,8 +281,6 @@ The Salesforce component supports 37 endpoint options which are listed
below:
 // endpoint options: END
 
 
-
-
 For obvious security reasons it is recommended that the clientId,
 clientSecret, userName and password fields be not set in the pom.xml.  
 The plugin should be configured for the rest of the properties, and can

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 600dcbf..61357ea 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -303,10 +303,10 @@ public class SalesforceComponent extends UriEndpointComponent implements
Endpoin
         }
     }
 
-    public SubscriptionHelper getSubscriptionHelper() throws Exception {
+    public SubscriptionHelper getSubscriptionHelper(String topicName) throws Exception {
         if (subscriptionHelper == null) {
             // lazily create subscription helper
-            subscriptionHelper = new SubscriptionHelper(this);
+            subscriptionHelper = new SubscriptionHelper(this, topicName);
 
             // also start the helper to connect to Salesforce
             ServiceHelper.startService(subscriptionHelper);

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
index b2fccfe..e606a81 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
@@ -47,6 +47,7 @@ public class SalesforceConsumer extends DefaultConsumer {
     private static final String TYPE_PROPERTY = "type";
     private static final String CREATED_DATE_PROPERTY = "createdDate";
     private static final String SOBJECT_PROPERTY = "sobject";
+    private static final String REPLAY_ID_PROPERTY = "replayId";
     private static final double MINIMUM_VERSION = 24.0;
 
     private final SalesforceEndpoint endpoint;
@@ -152,6 +153,7 @@ public class SalesforceConsumer extends DefaultConsumer {
         final Map<String, Object> event = (Map<String, Object>) data.get(EVENT_PROPERTY);
         final Object eventType = event.get(TYPE_PROPERTY);
         Object createdDate = event.get(CREATED_DATE_PROPERTY);
+        Object replayId = event.get(REPLAY_ID_PROPERTY);
         if (log.isDebugEnabled()) {
             log.debug(String.format("Received event %s on channel %s created on %s",
                     eventType, channel.getChannelId(), createdDate));
@@ -159,6 +161,9 @@ public class SalesforceConsumer extends DefaultConsumer {
 
         in.setHeader("CamelSalesforceEventType", eventType);
         in.setHeader("CamelSalesforceCreatedDate", createdDate);
+        if (replayId != null) {
+            in.setHeader("CamelSalesforceReplayId", replayId);
+        }
 
         // get SObject
         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index c0a9dac..0e8b3b1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -75,7 +75,7 @@ public class SalesforceEndpoint extends DefaultEndpoint {
         }
 
         final SalesforceConsumer consumer = new SalesforceConsumer(this, processor,
-            getComponent().getSubscriptionHelper());
+            getComponent().getSubscriptionHelper(topicName));
         configureConsumer(consumer);
         return consumer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 36a22af..39d0d06 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -72,6 +72,10 @@ public class SalesforceEndpointConfig implements Cloneable {
     public static final String REPORT_METADATA = "reportMetadata";
     public static final String INSTANCE_ID = "instanceId";
 
+    // parameters for Streaming API
+    public static final String DEFAULT_REPLAY_ID = "defaultReplayId";
+    public static final String INITIAL_REPLAY_ID_MAP = "initialReplayIdMap";
+
     // default maximum authentication retries on failed authentication or expired session
     public static final int DEFAULT_MAX_AUTHENTICATION_RETRIES = 4;
 
@@ -143,6 +147,12 @@ public class SalesforceEndpointConfig implements Cloneable {
     @UriParam
     private String instanceId;
 
+    // Streaming API properties
+    @UriParam
+    private Integer defaultReplayId;
+    @UriParam
+    private Map<String, Integer> initialReplayIdMap;
+
     // Salesforce Jetty9 HttpClient, set using reference
     @UriParam
     private SalesforceHttpClient httpClient;
@@ -535,6 +545,33 @@ public class SalesforceEndpointConfig implements Cloneable {
         valueMap.put(REPORT_METADATA, reportMetadata);
         valueMap.put(INSTANCE_ID, instanceId);
 
+        // add streaming API properties
+        valueMap.put(DEFAULT_REPLAY_ID, defaultReplayId);
+        valueMap.put(INITIAL_REPLAY_ID_MAP, initialReplayIdMap);
+
         return Collections.unmodifiableMap(valueMap);
     }
-}
\ No newline at end of file
+
+    public Integer getDefaultReplayId() {
+        return defaultReplayId;
+    }
+
+    /**
+     * Default replayId setting if no value is found in {@link #initialReplayIdMap}
+     * @param defaultReplayId
+     */
+    public void setDefaultReplayId(Integer defaultReplayId) {
+        this.defaultReplayId = defaultReplayId;
+    }
+
+    public Map<String, Integer> getInitialReplayIdMap() {
+        return initialReplayIdMap;
+    }
+
+    /**
+     * Replay IDs to start from per channel name.
+     */
+    public void setInitialReplayIdMap(Map<String, Integer> initialReplayIdMap) {
+        this.initialReplayIdMap = initialReplayIdMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java
index 85115d1..e4ac3e2 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/SObjectUrls.java
@@ -31,6 +31,7 @@ public class SObjectUrls extends AbstractDTOBase {
     private String compactLayouts;
     private String caseRowArticleSuggestions;
     private String push;
+    private String defaultValues;
 
     public String getSobject() {
         return sobject;
@@ -135,4 +136,12 @@ public class SObjectUrls extends AbstractDTOBase {
     public void setPush(String push) {
         this.push = push;
     }
+
+    public String getDefaultValues() {
+        return defaultValues;
+    }
+
+    public void setDefaultValues(String defaultValues) {
+        this.defaultValues = defaultValues;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
new file mode 100644
index 0000000..98df4ef
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSession;
+import org.cometd.bayeux.client.ClientSession.Extension.Adapter;
+
+/**
+ * CometDReplayExtension, typical usages are the following:
+ * {@code client.addExtension(new CometDReplayExtension<>(replayMap));}
+ *
+ * @author yzhao
+ * @since 198 (Winter '16)
+ */
+public class CometDReplayExtension<V> extends Adapter {
+    private static final String EXTENSION_NAME = "replay";
+    private final ConcurrentMap<String, V> dataMap = new ConcurrentHashMap<>();
+    private final AtomicBoolean supported = new AtomicBoolean();
+
+    public CometDReplayExtension(Map<String, V> dataMap) {
+        this.dataMap.putAll(dataMap);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean rcv(ClientSession session, Message.Mutable message) {
+        Object data = message.get(EXTENSION_NAME);
+        if (this.supported.get() && data != null) {
+            try {
+                dataMap.put(message.getChannel(), (V) data);
+            } catch (ClassCastException e) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean rcvMeta(ClientSession session, Message.Mutable message) {
+        switch (message.getChannel()) {
+        case Channel.META_HANDSHAKE:
+            Map<String, Object> ext = message.getExt(false);
+            this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME)));
+            break;
+        default:
+            break;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean sendMeta(ClientSession session, Message.Mutable message) {
+        switch (message.getChannel()) {
+        case Channel.META_HANDSHAKE:
+            message.getExt(true).put(EXTENSION_NAME, Boolean.TRUE);
+            break;
+        case Channel.META_SUBSCRIBE:
+            if (supported.get()) {
+                message.getExt(true).put(EXTENSION_NAME, dataMap);
+            }
+            break;
+        default:
+            break;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/68254a0b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 1cc4a21..befd168 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.salesforce.internal.streaming;
 
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -31,6 +33,8 @@ import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.ServiceSupport;
 import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSession;
+import org.cometd.bayeux.client.ClientSession.Extension;
 import org.cometd.bayeux.client.ClientSessionChannel;
 import org.cometd.client.BayeuxClient;
 import org.cometd.client.transport.ClientTransport;
@@ -56,6 +60,8 @@ public class SubscriptionHelper extends ServiceSupport {
 
     private static final String EXCEPTION_FIELD = "exception";
 
+    private static final double MINIMUM_REPLAY_VERSION = 36.0;
+
     private final SalesforceComponent component;
     private final SalesforceSession session;
     private final BayeuxClient client;
@@ -71,14 +77,14 @@ public class SubscriptionHelper extends ServiceSupport {
     private String connectError;
     private boolean reconnecting;
 
-    public SubscriptionHelper(SalesforceComponent component) throws Exception {
+    public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception
{
         this.component = component;
         this.session = component.getSession();
 
         this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
 
         // create CometD client
-        this.client = createClient();
+        this.client = createClient(topicName);
     }
 
     @Override
@@ -179,7 +185,7 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
-    private BayeuxClient createClient() throws Exception {
+    private BayeuxClient createClient(String topicName) throws Exception {
         // use default Jetty client from SalesforceComponent, its shared by all consumers
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
 
@@ -203,6 +209,29 @@ public class SubscriptionHelper extends ServiceSupport {
         };
 
         BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport);
+        Integer replayId = null;
+        String channelName = getChannelName(topicName);
+        Map<String, Integer> replayIdMap = component.getConfig().getInitialReplayIdMap();
+        if (replayIdMap != null) {
+            replayId = replayIdMap.get(channelName);
+        }
+        if (replayId == null) {
+            replayId = component.getConfig().getDefaultReplayId();
+        }
+        if (replayId != null) {
+            LOG.info("Sending replayId={} for channel {}", replayId, channelName);
+            List<Extension> extensions = client.getExtensions();
+            Extension ext = null;
+            for (Iterator<Extension> iter = extensions.iterator(); iter.hasNext();
ext = iter.next()) {
+                if (ext instanceof CometDReplayExtension) {
+                    iter.remove();
+                }
+            }
+            Map<String, Integer> dataMap = new HashMap<>();
+            dataMap.put(channelName, replayId);
+            ClientSession.Extension extension = new CometDReplayExtension<>(dataMap);
+            client.addExtension(extension);
+        }
         return client;
     }
 
@@ -344,7 +373,12 @@ public class SubscriptionHelper extends ServiceSupport {
     }
 
     public String getEndpointUrl() {
-        return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
+        if (Double.valueOf(component.getConfig().getApiVersion()) >= MINIMUM_REPLAY_VERSION
+            && (component.getConfig().getDefaultReplayId() != null || !component.getConfig().getInitialReplayIdMap().isEmpty()))
{
+            return component.getSession().getInstanceUrl() + "/cometd/replay/" + component.getConfig().getApiVersion();
+        } else {
+            return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
+        }
     }
 
 }


Mime
View raw message