flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Joireman <paul.joire...@physiq.com>
Subject Re: 回复:Re: Using RabbitMQ Sinks
Date Wed, 10 Aug 2016 12:31:06 GMT
Hi all,


Thanks for the help, the problem with sinks was an issue with the queue created on the RabbitMQ server.  I initially created it with the rabbitmqadmin web UI and after deleting it and allowing flink to create it.  Everything now works fine.


Paul

________________________________
From: rimin515@sina.cn <rimin515@sina.cn>
Sent: Wednesday, August 10, 2016 5:27:52 AM
To: user
Subject: 回复:Re: Using RabbitMQ Sinks

Hi,
i use the code follow,
  def main(args:Array[String]):Unit={

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(1000)

    val host = ""
    val quene = ""
    val port =***
    val user = ""
    val pass = ""
    //rmqConnectionConfig
    val rmqConnectionConfig= new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(user).setPassword(pass).setVirtualHost("/").build()
    ///
    //println(rmqConnectionConfig.getHost)
    val streamrdd=env.addSource(new RMQSource(rmqConnectionConfig,quene,false,new SimpleStringSchema))
    streamrdd.print()
    println("**************************************")
    env.execute("read rabbitmq")

  }

it can work,and read the data. i think you maybe have a wrong rabbitmq config,or use the wrong package,
for example,i use the 1.1-SNAPSHOT.
-------------------------------


----- 原始邮件 -----
发件人:Paul Joireman <paul.joireman@physiq.com>
收件人:"user@flink.apache.org" <user@flink.apache.org>
主题:Re: Using RabbitMQ Sinks
日期:2016年08月08日 23点27分


Robert,


It looks like the root cause exception is:


com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
The full execution trace is shown below and occurs when I try to execute my process.   In the process, I initially create a connection to RMQ using the RQQConnectionConfig.Builder() and this works fine if I read using a SimpleStringSchema but re-using the same connection configuration for the sink as follows:

msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE, new SimpleStringSchema()));

Where msgs are Strings.

Paul

Connected to the target VM, address: '127.0.0.1:42208', transport: 'socket'
10:13:50,555 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.physiq.alert.AlertMessageIn is not a valid POJO type
10:13:57,035 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
10:13:58,100 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster.
10:13:58,580 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
10:13:58,614 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-4e96-bb45-db70c0bcc7e1
10:13:58,620 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:34360 - max concurrent requests: 50 - max backlog: 1000
10:13:58,635 INFO  org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory  - Using job manager savepoint state backend.
10:13:58,642 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
10:13:58,655 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive_1
10:13:58,657 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka://flink/user/jobmanager_1.
10:13:58,678 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
10:13:58,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
10:13:58,703 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82% usable)
10:13:58,844 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
10:13:58,846 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Limiting managed memory to 549 MB, memory will be allocated lazily.
10:14:00,163 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b for spill files.
10:14:00,167 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
10:14:00,239 INFO  org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /tmp/flink-dist-cache-2e8256c3-532c-45b6-ad2d-fa7b43846a6e
10:14:00,239 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null
10:14:00,495 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor at akka://flink/user/taskmanager_1#2008243751.
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager data connection information: localhost (dataPort=58335)
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager has 4 task slot(s).
10:14:00,498 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB (used/committed/max)]
10:14:00,506 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
10:14:00,508 INFO  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  - TaskManager ResourceID{resourceId='8ca61ba2f8741d54ce649bd0d525d50c'} has started.
10:14:00,511 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as db51cf9997f8059543464810ffaffea3. Current number of registered hosts is 1. Current number of alive task slots is 4.
10:14:00,517 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
10:14:00,534 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be localhost/127.0.0.1:34360. Starting BLOB cache.
10:14:00,535 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-40de-b24c-0e2c57c030d0
10:14:00,535 INFO  org.apache.flink.runtime.metrics.MetricRegistry               - No metrics reporter configured, no metrics will be exposed/reported.
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor                - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705).
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor                - Could not submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705), because there is no connection to a JobManager.
10:14:00,551 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager null.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Connected to new JobManager akka://flink/user/jobmanager_1.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor                - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and wait for progress
10:14:00,558 INFO  org.apache.flink.runtime.client.JobClientActor                - Upload jar files to job manager akka://flink/user/jobmanager_1.
10:14:00,560 INFO  org.apache.flink.runtime.client.JobClientActor                - Submit job to the job manager akka://flink/user/jobmanager_1.
10:14:00,563 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,568 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Using restart strategy NoRestartStrategy for 6d641f666b26088a843355ff3b0b1705.
10:14:00,628 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,628 INFO  org.apache.flink.runtime.client.JobClientActor                - Job was successfully submitted to the JobManager akka://flink/user/jobmanager_1.
10:14:00,631 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CREATED to SCHEDULED
10:14:00,631 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Job execution switched to status RUNNING.
08/08/2016 10:14:00 Job execution switched to status RUNNING.
10:14:00,632 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED
10:14:00,633 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to RUNNING.
10:14:00,637 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from SCHEDULED to DEPLOYING
10:14:00,638 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING
10:14:00,638 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Source: Custom Source (1/1) (attempt #0) to localhost
10:14:00,642 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CREATED to SCHEDULED
10:14:00,642 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from SCHEDULED to DEPLOYING
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (1/4) (attempt #0) to localhost
10:14:00,643 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CREATED to SCHEDULED
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from SCHEDULED to DEPLOYING
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING
10:14:00,649 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (2/4) (attempt #0) to localhost
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CREATED to SCHEDULED
10:14:00,650 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED
10:14:00,655 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (3/4) (attempt #0) to localhost
10:14:00,656 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CREATED to SCHEDULED
10:14:00,657 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from SCHEDULED to DEPLOYING
10:14:00,657 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING
10:14:00,659 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Map (4/4) (attempt #0) to localhost
10:14:00,659 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYING
10:14:00,661 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING
10:14:00,661 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to localhost
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to localhost
10:14:00,663 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to localhost
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING
10:14:00,665 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to localhost
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Source: Custom Source (1/1)
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source (1/1)
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (1/4)
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (1/4)
10:14:00,683 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (2/4)
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (2/4)
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (3/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source (1/1) [DEPLOYING]
10:14:00,692 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (2/4) [DEPLOYING]
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (3/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Map (4/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (1/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (3/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Map (4/4)
10:14:00,696 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,703 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to RUNNING
10:14:00,704 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4) [DEPLOYING]
10:14:00,701 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to RUNNING
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,709 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4) [DEPLOYING]
10:14:00,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Map (4/4) [DEPLOYING]
10:14:00,711 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Received task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to RUNNING
10:14:00,715 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to RUNNING
10:14:00,719 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING
10:14:00,723 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING
10:14:00,720 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to RUNNING
10:14:00,718 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING
08/08/2016 10:14:00 Map(2/4) switched to RUNNING
10:14:00,726 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING
10:14:00,730 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from DEPLOYING to RUNNING
10:14:00,730 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING
08/08/2016 10:14:00 Map(3/4) switched to RUNNING
10:14:00,732 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from DEPLOYING to RUNNING
10:14:00,732 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING
08/08/2016 10:14:00 Map(1/4) switched to RUNNING
10:14:00,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4) [DEPLOYING]
10:14:00,736 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4) [DEPLOYING]
10:14:00,736 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING
10:14:00,750 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from DEPLOYING to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING
10:14:00,752 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING
08/08/2016 10:14:00 Map(4/4) switched to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING
10:14:00,757 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING
10:14:00,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING
10:14:00,756 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,766 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,765 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,776 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING
10:14:00,782 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,782 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,787 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,787 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,789 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,790 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,791 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,792 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,794 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,794 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,804 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,804 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,806 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,806 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,808 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,808 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been specified, using default state backend (Memory / JobManager)
10:14:00,809 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend is set to heap memory (checkpoint to jobmanager)
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Exception while closing user function while failing or canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:426)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
10:14:01,041 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,043 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task                     - Task execution failed.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,045 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,046 INFO  org.apache.flink.runtime.taskmanager.Task                     - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,057 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4)
10:14:01,058 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4)
10:14:01,059 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4)
10:14:01,063 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4)
10:14:01,074 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (7c219dda43bc53571d306fc0df4468fa)
10:14:01,078 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (dafdfde1351e3eb9593fa227e8255b57)
10:14:01,080 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (54a627d8091077e6742cd74d754016a7)
10:14:01,082 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state FAILED to JobManager for task Filter -> Map -> Sink: Unnamed (b3a3e69c7dd082fc744b6ec791697e35)
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,087 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,088 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
10:14:01,090 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more

10:14:01,092 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,092 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from RUNNING to CANCELING
08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,093 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING
10:14:01,095 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: Custom Source (1/1)
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to CANCELING
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e).
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING
08/08/2016 10:14:01 Map(1/4) switched to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING
08/08/2016 10:14:01 Map(2/4) switched to CANCELING
10:14:01,099 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING
08/08/2016 10:14:01 Map(3/4) switched to CANCELING
10:14:01,100 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING
08/08/2016 10:14:01 Map(4/4) switched to CANCELING
10:14:01,107 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (1/4)
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to CANCELING
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb).
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (2/4)
10:14:01,110 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to CANCELING
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (2/4) (99136f14b2e32b44318b49b2ad39dde5).
10:14:01,111 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,111 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (3/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to CANCELING
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (3/4) (029bc351981b5056208839ce988f0f3f).
10:14:01,112 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Map (4/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to CANCELING
10:14:01,113 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Map (4/4) (ae5e9324d04ffac7843317749a2e86dd).
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/4) switched to CANCELED
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (1/4)
10:14:01,116 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution b3a3e69c7dd082fc744b6ec791697e35
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (2/4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 54a627d8091077e6742cd74d754016a7
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (3/4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (2/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (3/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution dafdfde1351e3eb9593fa227e8255b57
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Discarding the results produced by task execution 7c219dda43bc53571d306fc0df4468fa
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (4a13efc94d64d61532106fbd9bdfaedb)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (99136f14b2e32b44318b49b2ad39dde5)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (029bc351981b5056208839ce988f0f3f)
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer service is shutting down.
10:14:01,122 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CANCELING to CANCELED
10:14:01,122 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED
08/08/2016 10:14:01 Map(1/4) switched to CANCELED
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED
08/08/2016 10:14:01 Map(2/4) switched to CANCELED
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (4/4) switched to CANCELED
08/08/2016 10:14:01 Map(3/4) switched to CANCELED
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Map (4/4)
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Map (ae5e9324d04ffac7843317749a2e86dd)
10:14:01,131 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CANCELING to CANCELED
10:14:01,132 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED
08/08/2016 10:14:01 Map(4/4) switched to CANCELED
10:14:01,147 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) switched to CANCELED
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (1/1)
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e519e)
10:14:01,149 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) switched from CANCELING to CANCELED
10:14:01,150 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED
10:14:01,154 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more
10:14:01,154 INFO  org.apache.flink.runtime.client.JobClientActor                - 08/08/2016 10:14:01 Job execution switched to status FAILED.
08/08/2016 10:14:01 Job execution switched to status FAILED.
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClient                     - Job execution failed
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClientActor                - Terminate JobClientActor.
10:14:01,161 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Stopping FlinkMiniCluster.
10:14:01,161 INFO  org.apache.flink.runtime.client.JobClientActor                - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:01,174 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Stopping JobManager akka://flink/user/jobmanager_1.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
10:14:01,192 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
10:14:01,194 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:34360
10:14:01,208 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager removed spill file directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
10:14:01,212 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while creating the channel
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 more



________________________________
From: Robert Metzger <rmetzger@apache.org>
Sent: Monday, August 8, 2016 9:48:39 AM
To: user@flink.apache.org
Subject: Re: Using RabbitMQ Sinks

Hi Paul,

the example in the code is outdated, StringToByteSerializer has probably been removed quite a while ago. I'll update the documentation once we figured out the other problem you reported.
What's the exception you are getting?

Regards,
Robert

On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <paul.joireman@physiq.com<mailto:paul.joireman@physiq.com>> wrote:

Hi all,


The documentation describing the use of RabbitMQ as a sink gives the following example:


RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello", new StringToByteSerializer()));

However, a search of the flink github mirrored repo<https://github.com/apache/flink> does not show where StringToByteSerializer is defined and only shows it being used in the documentation of

this example.    I've tried using a SimpleStringSchema which seems to handle serialization but this raises an exception when I attempt to

run it.


Does anyone have any experience with using a RabbitMQ sink?   Any pointers as to what I'm doing wrong.


Thanks,

Paul

Mime
View raw message