flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [flink] branch release-1.9 updated: [FLINK-14562] Let RabbitMQ source close consumer and channel on close
Date Sun, 03 Nov 2019 20:12:42 GMT
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 93f2c68  [FLINK-14562] Let RabbitMQ source close consumer and channel on close
93f2c68 is described below

commit 93f2c68c02080d30044924640db9643fdeb4dba8
Author: Nicolas Deslandes <nicolas.deslandes@appdirect.com>
AuthorDate: Tue Oct 29 10:41:59 2019 -0400

    [FLINK-14562] Let RabbitMQ source close consumer and channel on close
    
    Closing method of RabbitMQ source must close consumer and channel in order to prevent
leaving idle consumer
    
    This closes #10036.
---
 .../streaming/connectors/rabbitmq/RMQSource.java      | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index d454153..f079369 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -177,6 +177,25 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	@Override
 	public void close() throws Exception {
 		super.close();
+
+		try {
+			if (consumer != null && channel != null) {
+				channel.basicCancel(consumer.getConsumerTag());
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Error while cancelling RMQ consumer on " + queueName
+				+ " at " + rmqConnectionConfig.getHost(), e);
+		}
+
+		try {
+			if (channel != null) {
+				channel.close();
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing RMQ channel with " + queueName
+				+ " at " + rmqConnectionConfig.getHost(), e);
+		}
+
 		try {
 			if (connection != null) {
 				connection.close();


Mime
View raw message