This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit a5008b39b08a89bc49e83cf4b7a41db8f7defbf4
Author: Guillaume Nodet <gnodet@gmail.com>
AuthorDate: Wed Aug 29 16:58:06 2018 +0200
[CAMEL-12763] Improve the logic to relate the consumer to the producer in the direct component
---
.../component/direct/DirectBlockingProducer.java | 96 ----------------------
.../camel/component/direct/DirectConsumer.java | 11 +--
.../camel/component/direct/DirectEndpoint.java | 70 ++++++++++++----
.../camel/component/direct/DirectProducer.java | 37 +++++++--
4 files changed, 83 insertions(+), 131 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
deleted file mode 100644
index 76e98bc..0000000
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectBlockingProducer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.direct;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The direct producer.
- * <p/>
- * If blocking is enabled ({@code DirectEndpoint#isBlock}) then the DirectEndpoint will create
an instance
- * of this class instead of {@code DirectProducer}.
- * This producers {@code process} method will block for the configured duration ({@code DirectEndpoint#getTimeout},
- * default to 30 seconds). After which if a consumer is still unavailable a DirectConsumerNotAvailableException
- * will be thrown.
- * <p/>
- * Implementation note: Concurrent Producers will block for the duration it takes to determine
if a
- * consumer is available, but actual consumer execution will happen concurrently.
- */
-public class DirectBlockingProducer extends DefaultAsyncProducer {
- private static final Logger LOG = LoggerFactory.getLogger(DirectBlockingProducer.class);
- private final DirectEndpoint endpoint;
-
- public DirectBlockingProducer(DirectEndpoint endpoint) {
- super(endpoint);
- this.endpoint = endpoint;
- }
-
- public void process(Exchange exchange) throws Exception {
- getConsumer(exchange).getProcessor().process(exchange);
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
- try {
- return getConsumer(exchange).getAsyncProcessor().process(exchange, callback);
- } catch (Exception e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
- }
-
- protected DirectConsumer getConsumer(Exchange exchange) throws Exception {
- DirectConsumer answer = endpoint.getConsumer();
- if (answer == null) {
- // okay then await until we have a consumer or we timed out
- answer = awaitConsumer();
- if (answer == null) {
- throw new DirectConsumerNotAvailableException("No consumers available on
endpoint: " + endpoint, exchange);
- }
- }
-
- return answer;
- }
-
- private DirectConsumer awaitConsumer() throws InterruptedException {
- DirectConsumer answer = null;
-
- StopWatch watch = new StopWatch();
- boolean done = false;
- while (!done) {
- // sleep a bit to give chance for the consumer to be ready
- Thread.sleep(500);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waited {} for consumer to be ready", watch.taken());
- }
-
- answer = endpoint.getConsumer();
- if (answer != null) {
- return answer;
- }
- // we are done if we hit the timeout
- done = watch.taken() >= endpoint.getTimeout();
- }
- return answer;
- }
-
-}
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 9e097f8..debed19 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -45,14 +45,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware,
Su
@Override
protected void doStart() throws Exception {
super.doStart();
- // add consumer to endpoint
- boolean existing = this == endpoint.getConsumer();
- if (!existing && endpoint.hasConsumer(this)) {
- throw new IllegalArgumentException("Cannot add a 2nd consumer to the same endpoint.
Endpoint " + endpoint + " only allows one consumer.");
- }
- if (!existing) {
- endpoint.addConsumer(this);
- }
+ endpoint.addConsumer(this);
}
@Override
@@ -69,7 +62,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware,
Su
@Override
protected void doResume() throws Exception {
// resume by using the start logic
- doStart();
+ endpoint.addConsumer(this);
}
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 8347c5d..e93e973 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.direct;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.camel.Component;
@@ -29,6 +31,8 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
/**
* The direct component provides direct, synchronous call to another endpoint from the same
CamelContext.
@@ -38,7 +42,8 @@ import org.apache.camel.util.ObjectHelper;
@UriEndpoint(firstVersion = "1.0.0", scheme = "direct", title = "Direct", syntax = "direct:name",
consumerClass = DirectConsumer.class, label = "core,endpoint")
public class DirectEndpoint extends DefaultEndpoint {
- private volatile Map<String, DirectConsumer> consumers;
+ private final Map<String, DirectConsumer> consumers;
+ private final List<DirectProducer> producers = new ArrayList<>();
@UriPath(description = "Name of direct endpoint") @Metadata(required = "true")
private String name;
@@ -55,7 +60,7 @@ public class DirectEndpoint extends DefaultEndpoint {
}
public DirectEndpoint(String endpointUri, Component component) {
- this(endpointUri, component, new HashMap<String, DirectConsumer>());
+ this(endpointUri, component, new HashMap<>());
}
public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer>
consumers) {
@@ -64,11 +69,7 @@ public class DirectEndpoint extends DefaultEndpoint {
}
public Producer createProducer() throws Exception {
- if (block) {
- return new DirectBlockingProducer(this);
- } else {
- return new DirectProducer(this);
- }
+ return new DirectProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
@@ -82,23 +83,58 @@ public class DirectEndpoint extends DefaultEndpoint {
}
public void addConsumer(DirectConsumer consumer) {
- String key = consumer.getEndpoint().getKey();
- consumers.put(key, consumer);
+ String key = getKey();
+ synchronized (consumers) {
+ if (consumers.putIfAbsent(key, consumer) != null) {
+ throw new IllegalArgumentException("Cannot add a 2nd consumer to the same
endpoint. Endpoint " + this + " only allows one consumer.");
+ }
+ consumers.notifyAll();
+ }
}
public void removeConsumer(DirectConsumer consumer) {
- String key = consumer.getEndpoint().getKey();
- consumers.remove(key);
+ String key = getKey();
+ synchronized (consumers) {
+ consumers.remove(key, consumer);
+ consumers.notifyAll();
+ }
}
- public boolean hasConsumer(DirectConsumer consumer) {
- String key = consumer.getEndpoint().getKey();
- return consumers.containsKey(key);
+ public void addProducer(DirectProducer producer) {
+ synchronized (consumers) {
+ producers.add(producer);
+ }
}
- public DirectConsumer getConsumer() {
+ public void removeProducer(DirectProducer producer) {
+ synchronized (consumers) {
+ producers.remove(producer);
+ }
+ }
+
+ protected DirectConsumer getConsumer() throws InterruptedException {
String key = getKey();
- return consumers.get(key);
+ synchronized (consumers) {
+ DirectConsumer answer = consumers.get(key);
+ if (answer == null && block) {
+ StopWatch watch = new StopWatch();
+ for (;;) {
+ answer = consumers.get(key);
+ if (answer != null) {
+ break;
+ }
+ long rem = timeout - watch.taken();
+ if (rem <= 0) {
+ break;
+ }
+ consumers.wait(rem);
+ }
+ }
+// if (answer != null && answer.getEndpoint() != this) {
+// throw new IllegalStateException();
+// }
+ return answer;
+ }
}
public boolean isBlock() {
@@ -140,7 +176,7 @@ public class DirectEndpoint extends DefaultEndpoint {
protected String getKey() {
String uri = getEndpointUri();
if (uri.indexOf('?') != -1) {
- return ObjectHelper.before(uri, "?");
+ return StringHelper.before(uri, "?");
} else {
return uri;
}
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index c4bcf2d..126ea9c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -36,30 +36,49 @@ public class DirectProducer extends DefaultAsyncProducer {
this.endpoint = endpoint;
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ endpoint.addProducer(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ endpoint.removeProducer(this);
+ super.doStop();
+ }
+
public void process(Exchange exchange) throws Exception {
- if (endpoint.getConsumer() == null) {
+ DirectConsumer consumer = endpoint.getConsumer();
+ if (consumer == null) {
if (endpoint.isFailIfNoConsumers()) {
throw new DirectConsumerNotAvailableException("No consumers available on
endpoint: " + endpoint, exchange);
} else {
LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
}
} else {
- endpoint.getConsumer().getProcessor().process(exchange);
+ consumer.getProcessor().process(exchange);
}
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- if (endpoint.getConsumer() == null) {
- if (endpoint.isFailIfNoConsumers()) {
- // indicate its done synchronously
- exchange.setException(new DirectConsumerNotAvailableException("No consumers
available on endpoint: " + endpoint, exchange));
+ try {
+ DirectConsumer consumer = endpoint.getConsumer();
+ if (consumer == null) {
+ if (endpoint.isFailIfNoConsumers()) {
+ exchange.setException(new DirectConsumerNotAvailableException("No consumers
available on endpoint: " + endpoint, exchange));
+ } else {
+ LOG.debug("message ignored, no consumers available on endpoint: {}",
endpoint);
+ }
+ callback.done(true);
+ return true;
} else {
- LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+ return consumer.getAsyncProcessor().process(exchange, callback);
}
+ } catch (Exception e) {
+ exchange.setException(e);
callback.done(true);
return true;
- } else {
- return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback);
}
}
|