camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: Camel connector allow to do custom logic before producer or consumer does anything.
Date Fri, 07 Apr 2017 13:14:09 GMT
Repository: camel
Updated Branches:
  refs/heads/master 38c3cfa50 -> 3e1f66d04


 Camel connector allow to do custom logic before producer or consumer does anything.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e1f66d0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e1f66d0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e1f66d0

Branch: refs/heads/master
Commit: 3e1f66d04d352a890bc591e99afd3a94d51e5499
Parents: 38c3cfa
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri Apr 7 15:13:59 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Apr 7 15:13:59 2017 +0200

----------------------------------------------------------------------
 .../connector/ConnectorConsumerProcessor.java   | 69 ++++++++++++++++++++
 .../component/connector/ConnectorProducer.java  |  4 ++
 .../connector/DefaultConnectorEndpoint.java     |  4 +-
 3 files changed, 75 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
new file mode 100644
index 0000000..b8d6eaa
--- /dev/null
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.connector;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.processor.DelegateAsyncProcessor;
+import org.apache.camel.support.SynchronizationAdapter;
+
+/**
+ * Connector {@link Processor} which is capable of performing before and after custom processing
+ * while consuming a message (ie from the consumer).
+ */
+public class ConnectorConsumerProcessor extends DelegateAsyncProcessor {
+
+    private final Processor beforeConsumer;
+    private final Processor afterConsumer;
+
+    public ConnectorConsumerProcessor(Processor processor, Processor beforeConsumer, Processor
afterConsumer) {
+        super(processor);
+        this.beforeConsumer = beforeConsumer;
+        this.afterConsumer = afterConsumer;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (afterConsumer != null) {
+            exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() {
+                @Override
+                public void onAfterRoute(Route route, Exchange exchange) {
+                    try {
+                        afterConsumer.process(exchange);
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                }
+            });
+        }
+
+        if (beforeConsumer != null) {
+            try {
+                beforeConsumer.process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                callback.done(true);
+                return true;
+            }
+        }
+
+        return super.process(exchange, callback);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index b0d7225..2904ac6 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -25,6 +25,10 @@ import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ServiceHelper;
 
+/**
+ * Connector {@link Producer} which is capable of performing before and after custom processing
+ * while processing (ie sending the message).
+ */
 public class ConnectorProducer extends DefaultProducer {
 
     private final Producer producer;

http://git-wip-us.apache.org/repos/asf/camel/blob/3e1f66d0/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index ba9e93b..ac9b2a9 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -49,8 +49,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements
Delegat
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = endpoint.createConsumer(processor);
-        return answer;
+        ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(),
getComponent().getAfterConsumer());
+        return endpoint.createConsumer(delegate);
     }
 
     @Override


Mime
View raw message