camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject [camel] 01/04: [CAMEL-12763] Improve the logic to relate the consumer to the producer in the direct component
Date Mon, 03 Sep 2018 10:11:49 GMT
This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a5008b39b08a89bc49e83cf4b7a41db8f7defbf4
Author: Guillaume Nodet <gnodet@gmail.com>
AuthorDate: Wed Aug 29 16:58:06 2018 +0200

    [CAMEL-12763] Improve the logic to relate the consumer to the producer in the direct component
---
 .../component/direct/DirectBlockingProducer.java   | 96 ----------------------
 .../camel/component/direct/DirectConsumer.java     | 11 +--
 .../camel/component/direct/DirectEndpoint.java     | 70 ++++++++++++----
 .../camel/component/direct/DirectProducer.java     | 37 +++++++--
 4 files changed, 83 insertions(+), 131 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
deleted file mode 100644
index 76e98bc..0000000
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.direct;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The direct producer.
- * <p/>
- * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create
an instance
- * of this class instead of {@code DirectProducer}.
- * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
- * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
- * will be thrown.
- * <p/>
- * Implementation note: Concurrent Producers will block for the duration it takes to determine
if a
- * consumer is available, but actual consumer execution will happen concurrently.
- */
-public class DirectBlockingProducer extends DefaultAsyncProducer {
-    private static final Logger LOG = LoggerFactory.getLogger(DirectBlockingProducer.class);
-    private final DirectEndpoint endpoint;
-
-    public DirectBlockingProducer(DirectEndpoint endpoint) {
-        super(endpoint);
-        this.endpoint = endpoint;
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        getConsumer(exchange).getProcessor().process(exchange);
-    }
-
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        try {
-            return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
-        } catch (Exception e) {
-            exchange.setException(e);
-            callback.done(true);
-            return true;
-        }
-    }
-
-    protected DirectConsumer getConsumer(Exchange exchange) throws Exception {
-        DirectConsumer answer = endpoint.getConsumer();
-        if (answer == null) {
-            // okay then await until we have a consumer or we timed out
-            answer = awaitConsumer();
-            if (answer == null) {
-                throw new DirectConsumerNotAvailableException("No consumers available on
endpoint: " + endpoint, exchange);
-            }
-        }
-
-        return answer;
-    }
-
-    private DirectConsumer awaitConsumer() throws InterruptedException {
-        DirectConsumer answer = null;
-
-        StopWatch watch = new StopWatch();
-        boolean done = false;
-        while (!done) {
-            // sleep a bit to give chance for the consumer to be ready
-            Thread.sleep(500);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Waited {} for consumer to be ready", watch.taken());
-            }
-
-            answer = endpoint.getConsumer();
-            if (answer != null) {
-                return answer;
-            }
-            // we are done if we hit the timeout
-            done = watch.taken() >= endpoint.getTimeout();
-        }
-        return answer;
-    }
-
-}
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 9e097f8..debed19 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -45,14 +45,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware,
Su
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        // add consumer to endpoint
-        boolean existing = this == endpoint.getConsumer();
-        if (!existing && endpoint.hasConsumer(this)) {
-            throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint.
Endpoint " + endpoint + " only allows one consumer.");
-        }
-        if (!existing) {
-            endpoint.addConsumer(this);
-        }
+        endpoint.addConsumer(this);
     }
 
     @Override
@@ -69,7 +62,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware,
Su
     @Override
     protected void doResume() throws Exception {
         // resume by using the start logic
-        doStart();
+        endpoint.addConsumer(this);
     }
 
     public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 8347c5d..e93e973 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.direct;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Component;
@@ -29,6 +31,8 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
 
 /**
  * The direct component provides direct, synchronous call to another endpoint from the same
CamelContext.
@@ -38,7 +42,8 @@ import org.apache.camel.util.ObjectHelper;
 @UriEndpoint(firstVersion = "1.0.0", scheme = "direct", title = "Direct", syntax = "direct:name",
consumerClass = DirectConsumer.class, label = "core,endpoint")
 public class DirectEndpoint extends DefaultEndpoint {
 
-    private volatile Map<String, DirectConsumer> consumers;
+    private final Map<String, DirectConsumer> consumers;
+    private final List<DirectProducer> producers = new ArrayList<>();
 
     @UriPath(description = "Name of direct endpoint") @Metadata(required = "true")
     private String name;
@@ -55,7 +60,7 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public DirectEndpoint(String endpointUri, Component component) {
-        this(endpointUri, component, new HashMap<String, DirectConsumer>());
+        this(endpointUri, component, new HashMap<>());
     }
 
     public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer>
consumers) {
@@ -64,11 +69,7 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        if (block) {
-            return new DirectBlockingProducer(this);
-        } else {
-            return new DirectProducer(this);
-        }
+        return new DirectProducer(this);
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -82,23 +83,58 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public void addConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getKey();
-        consumers.put(key, consumer);
+        String key = getKey();
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException("Cannot add a 2nd consumer to the same
endpoint. Endpoint " + this + " only allows one consumer.");
+            }
+            consumers.notifyAll();
+        }
     }
 
     public void removeConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getKey();
-        consumers.remove(key);
+        String key = getKey();
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            consumers.notifyAll();
+        }
     }
 
-    public boolean hasConsumer(DirectConsumer consumer) {
-        String key = consumer.getEndpoint().getKey();
-        return consumers.containsKey(key);
+    public void addProducer(DirectProducer producer) {
+        synchronized (consumers) {
+            producers.add(producer);
+        }
     }
 
-    public DirectConsumer getConsumer() {
+    public void removeProducer(DirectProducer producer) {
+        synchronized (consumers) {
+            producers.remove(producer);
+        }
+    }
+
+    protected DirectConsumer getConsumer() throws InterruptedException {
         String key = getKey();
-        return consumers.get(key);
+        synchronized (consumers) {
+            DirectConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (;;) {
+                    answer = consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
+            }
+//            if (answer != null && answer.getEndpoint() != this) {
+//                throw new IllegalStateException();
+//            }
+            return answer;
+        }
     }
 
     public boolean isBlock() {
@@ -140,7 +176,7 @@ public class DirectEndpoint extends DefaultEndpoint {
     protected String getKey() {
         String uri = getEndpointUri();
         if (uri.indexOf('?') != -1) {
-            return ObjectHelper.before(uri, "?");
+            return StringHelper.before(uri, "?");
         } else {
             return uri;
         }
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index c4bcf2d..126ea9c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -36,30 +36,49 @@ public class DirectProducer extends DefaultAsyncProducer {
         this.endpoint = endpoint;
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        endpoint.addProducer(this);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        endpoint.removeProducer(this);
+        super.doStop();
+    }
+
     public void process(Exchange exchange) throws Exception {
-        if (endpoint.getConsumer() == null) {
+        DirectConsumer consumer = endpoint.getConsumer();
+        if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
                 throw new DirectConsumerNotAvailableException("No consumers available on
endpoint: " + endpoint, exchange);
             } else {
                 LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
             }
         } else {
-            endpoint.getConsumer().getProcessor().process(exchange);
+            consumer.getProcessor().process(exchange);
         }
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        if (endpoint.getConsumer() == null) {
-            if (endpoint.isFailIfNoConsumers()) {
-                // indicate its done synchronously
-                exchange.setException(new DirectConsumerNotAvailableException("No consumers
available on endpoint: " + endpoint, exchange));
+        try {
+            DirectConsumer consumer = endpoint.getConsumer();
+            if (consumer == null) {
+                if (endpoint.isFailIfNoConsumers()) {
+                    exchange.setException(new DirectConsumerNotAvailableException("No consumers
available on endpoint: " + endpoint, exchange));
+                } else {
+                    LOG.debug("message ignored, no consumers available on endpoint: {}",
endpoint);
+                }
+                callback.done(true);
+                return true;
             } else {
-                LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+                return consumer.getAsyncProcessor().process(exchange, callback);
             }
+        } catch (Exception e) {
+            exchange.setException(e);
             callback.done(true);
             return true;
-        } else {
-            return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback);
         }
     }
 


Mime
View raw message