flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #4979: RMQSource support disabling queue declaration
Date Fri, 19 Jan 2018 14:19:20 GMT
Github user GJL commented on a diff in the pull request:

    --- Diff: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
    @@ -138,7 +138,9 @@ protected ConnectionFactory setupConnectionFactory() throws Exception
     	 * defining custom queue parameters)
     	protected void setupQueue() throws IOException {
    -		channel.queueDeclare(queueName, true, false, false, null);
    +		if (rmqConnectionConfig.isQueueDeclaration()) {
    --- End diff --
    Thanks for your contribution to Apache Flink @sihuazhou!  I have reviewed your code, and
I am not sure if the additional flag is needed. The original author of the `RMQSource` declared
this method protected, which means that if you do not want the queue to be declared, you can
simply override the method with an empty implementation. For example:
    				env.addSource(new RMQSource<String>(
    						new SimpleStringSchema()) {
    					protected void setupQueue() {
    					     // do not declare queue
    This intent is also reflected in the Javadoc:
    	 * Sets up the queue. The default implementation just declares the queue. The user may
    	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange
    	 * defining custom queue parameters)
    Moreover, `RMQSink#setupQueue` also declares the queue by default, which is not addressed
in your pull request. Please let me know what you think @sihuazhou 
    cc: @tzulitai @zentol 


View raw message