Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89A0C200B59 for ; Mon, 8 Aug 2016 17:27:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 87E32160AB3; Mon, 8 Aug 2016 15:27:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5062D160A77 for ; Mon, 8 Aug 2016 17:27:41 +0200 (CEST) Received: (qmail 74629 invoked by uid 500); 8 Aug 2016 15:27:40 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 74619 invoked by uid 99); 8 Aug 2016 15:27:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Aug 2016 15:27:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9F43F1A5C94 for ; Mon, 8 Aug 2016 15:27:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.941 X-Spam-Level: ** X-Spam-Status: No, score=2.941 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, HTML_TAG_BALANCE_BODY=0.712, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_HELO_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=vgbio360.onmicrosoft.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id yD87lo2QaA7U for ; Mon, 8 Aug 2016 15:27:21 +0000 (UTC) Received: from NAM02-SN1-obe.outbound.protection.outlook.com (mail-sn1nam02on0127.outbound.protection.outlook.com [104.47.36.127]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id D80065FE20 for ; Mon, 8 Aug 2016 15:27:19 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=vgbio360.onmicrosoft.com; s=selector1-physiq-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version; bh=5uJhg3oEZZ1S1M0qCHI4sEexVsp9krSfSpVePJ0U4h8=; b=guEGb6i7M0qPJF2nI2bjIBMj9ixcB5RZAkfKzWbuRipTnSPptB4LkabQs27nWNBpFIp/T/DZ/mWrlgd9ACTtpLK4pJdGhdjxDgLWclqBca8qpoUlYWPy840j1binGBqrdnrW4egsPCvc09uxbkHBUfielGkgUWZUKeGOM9BZEZE= Received: from CY1PR02MB1995.namprd02.prod.outlook.com (10.166.189.153) by CY1PR02MB1995.namprd02.prod.outlook.com (10.166.189.153) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384_P384) id 15.1.549.15; Mon, 8 Aug 2016 15:27:07 +0000 Received: from CY1PR02MB1995.namprd02.prod.outlook.com ([10.166.189.153]) by CY1PR02MB1995.namprd02.prod.outlook.com ([10.166.189.153]) with mapi id 15.01.0549.025; Mon, 8 Aug 2016 15:27:07 +0000 From: Paul Joireman To: "user@flink.apache.org" Subject: Re: Using RabbitMQ Sinks Thread-Topic: Using RabbitMQ Sinks Thread-Index: AQHR8X7YhYyXQBFoHUCsA48lj/32paA/JOiAgAAFyB8= Date: Mon, 8 Aug 2016 15:27:07 +0000 Message-ID: References: , In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: spf=none (sender IP is ) smtp.mailfrom=paul.joireman@physiq.com; x-originating-ip: [96.27.106.194] x-ms-office365-filtering-correlation-id: cd29f0ea-f127-422b-efe5-08d3bfa07549 x-microsoft-exchange-diagnostics: 1;CY1PR02MB1995;6:FQaqakgDpQ5fqEA44PRacKB+BHOU20Pyke0KoGRCqLURqjCZxVc1PGSxUKWUs4SWEuKYBwotAd7P10AyESF8VsZs4wgFVdGSdGm2jwHNcjX0SnhYPUZ0uFQm+ksk6l6GudaxXj+Z3A7SRm1Qahy6HNJh0w2cLIv+y8fmq9asicKXn+vNChF59Y+jnnu7Bk9/vlkEaSh6h6o30nBmWRm8k2qwhjrohfvs7+h1W2qCwt4FXOzzwAqrT8vyKUhh6M3KOrCjPxpG/QMkCUD8TQaJOqHVY7MXgzxodnrlSo6vJc3MtBOrHLqgGEjZi8wo+4eZ;5:JbctoMI//kl0x6jRC/I45e2/w8ecJ15MBHlbhZ+LiuHM2dlk8EDw+6qJWGToimCh+htkP303Am92pFLIZ1zOv+f3eu88dOnUB08WSxgz8CeIgfWAVY7MuDz/hLw6hvSzw31v+7xp2TIx3TpdAAAWew==;24:5/bb0p4g1yXEVQchn8fUKZP37N4QOEN3I/MWJLj8aHLCHbdMZNn6dsU6MyBkQGvMEclbRKFNQwKRGC03/SzrD53xdxYPnEAb11efYADoQdM=;7:tk7FjJDN4qDuxc7m3b9vxJHU3KdRRf/xGxemkY1el5usqf5HFKSHpbBJJq+fACdao0pxFOuipqaWgAU27xwIVsGq8POaHgvSDH3cnpfhX2QHRSGpskA35OWmkmr/xrjxoW9/10xPjnm04rYTPOHS37hpcdpay+UkOh16DyRXtOs+ylHmuz+obyKev5VrvzgXlIsX8rSYudFi/AbVcgMFWepax7VaHTzkCnJQN5o1Zum0jzRFR4A5+3sGcT7Y6Y8W x-microsoft-antispam: UriScan:;BCL:0;PCL:0;RULEID:;SRVR:CY1PR02MB1995; x-microsoft-antispam-prvs: x-exchange-antispam-report-test: UriScan:(158342451672863)(166708455590820)(200054503718035)(265634631926514)(21532816269658); x-exchange-antispam-report-cfa-test: BCL:0;PCL:0;RULEID:(6040130)(601004)(2401047)(5005006)(8121501046)(3002001)(10201501046)(6041072)(6043046)(6042046);SRVR:CY1PR02MB1995;BCL:0;PCL:0;RULEID:;SRVR:CY1PR02MB1995; x-forefront-prvs: 00286C0CA6 x-forefront-antispam-report: SFV:NSPM;SFS:(10019020)(7916002)(460184002)(377454003)(199003)(24454002)(189002)(288314003)(53754006)(7696003)(122556002)(54356999)(76576001)(7736002)(10400500002)(97736004)(15975445007)(7906003)(74316002)(3280700002)(450100001)(19625215002)(7846002)(77096005)(50986999)(2906002)(16236675004)(76176999)(5002640100001)(110136002)(2950100001)(19580395003)(19580405001)(107886002)(586003)(2900100001)(8676002)(19627405001)(3660700001)(9686002)(11100500001)(2501003)(551544002)(81166006)(105586002)(6116002)(102836003)(3846002)(81156014)(1730700003)(189998001)(8936002)(66066001)(87936001)(106116001)(68736007)(575784001)(86362001)(2351001)(92566002)(106356001)(3480700004)(19617315012)(101416001)(99286002)(33656002)(559001)(579004)(569005);DIR:OUT;SFP:1102;SCL:1;SRVR:CY1PR02MB1995;H:CY1PR02MB1995.namprd02.prod.outlook.com;FPR:;SPF:None;PTR:InfoNoRecords;A:1;MX:1;LANG:en; received-spf: None (protection.outlook.com: physiq.com does not designate permitted sender hosts) spamdiagnosticoutput: 1:99 spamdiagnosticmetadata: NSPM Content-Type: multipart/alternative; boundary="_000_CY1PR02MB1995F96E96479E767C50F1CA801B0CY1PR02MB1995namp_" MIME-Version: 1.0 X-OriginatorOrg: physiq.com X-MS-Exchange-CrossTenant-originalarrivaltime: 08 Aug 2016 15:27:07.1933 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: ecbda8f3-3c3b-43e4-9663-6beab1acd10b X-MS-Exchange-Transport-CrossTenantHeadersStamped: CY1PR02MB1995 archived-at: Mon, 08 Aug 2016 15:27:45 -0000 --_000_CY1PR02MB1995F96E96479E767C50F1CA801B0CY1PR02MB1995namp_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Robert, It looks like the root cause exception is: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method= : #method(reply-code=3D406, reply-text=3DPRECONDITION_FAILED= - parameters for queue 'flink-sink' in vhost '/' not equivalent, class-id= =3D50, method-id=3D10) The full execution trace is shown below and occurs when I try to execute my= process. In the process, I initially create a connection to RMQ using th= e RQQConnectionConfig.Builder() and this works fine if I read using a Simpl= eStringSchema but re-using the same connection configuration for the sink a= s follows: msgs.addSink(new RMQSink(rmqConnConfig, ALERT_SINK_QUEUE, new Simpl= eStringSchema())); Where msgs are Strings. Paul Connected to the target VM, address: '127.0.0.1:42208', transport: 'socket' 10:13:50,555 INFO org.apache.flink.api.java.typeutils.TypeExtractor = - class com.physiq.alert.AlertMessageIn is not a valid POJO type 10:13:57,035 INFO org.apache.flink.streaming.api.environment.LocalStreamEn= vironment - Running job on local embedded Flink mini cluster 10:13:58,100 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster = - Starting FlinkMiniCluster. 10:13:58,580 INFO akka.event.slf4j.Slf4jLogger = - Slf4jLogger started 10:13:58,614 INFO org.apache.flink.runtime.blob.BlobServer = - Created BLOB server storage directory /tmp/blobStore-b19e22df-4636-= 4e96-bb45-db70c0bcc7e1 10:13:58,620 INFO org.apache.flink.runtime.blob.BlobServer = - Started BLOB server at 0.0.0.0:34360 - max concurrent requests: 50 = - max backlog: 1000 10:13:58,635 INFO org.apache.flink.runtime.checkpoint.savepoint.SavepointS= toreFactory - Using job manager savepoint state backend. 10:13:58,642 INFO org.apache.flink.runtime.metrics.MetricRegistry = - No metrics reporter configured, no metrics will be exposed/reported= . 10:13:58,655 INFO org.apache.flink.runtime.jobmanager.MemoryArchivist = - Started memory archivist akka://flink/user/archive_1 10:13:58,657 INFO org.apache.flink.runtime.jobmanager.JobManager = - Starting JobManager at akka://flink/user/jobmanager_1. 10:13:58,678 INFO org.apache.flink.runtime.clusterframework.standalone.Sta= ndaloneResourceManager - Trying to associate with JobManager leader akka:/= /flink/user/jobmanager_1 10:13:58,683 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Messages between TaskManager and JobManager have a max timeout of 1= 0000 milliseconds 10:13:58,703 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Temporary file directory '/tmp': total 102 GB, usable 9 GB (8.82% u= sable) 10:13:58,844 INFO org.apache.flink.runtime.io.network.buffer.NetworkBuffer= Pool - Allocated 64 MB for network buffer pool (number of memory segments:= 2048, bytes per segment: 32768). 10:13:58,846 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Limiting managed memory to 549 MB, memory will be allocated lazily. 10:14:00,163 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager = - I/O manager uses directory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb= 6ca465eb2b for spill files. 10:14:00,167 INFO org.apache.flink.runtime.jobmanager.JobManager = - JobManager akka://flink/user/jobmanager_1 was granted leadership wi= th leader session ID None. 10:14:00,239 INFO org.apache.flink.runtime.filecache.FileCache = - User file cache uses directory /tmp/flink-dist-cache-2e8256c3-532c-= 45b6-ad2d-fa7b43846a6e 10:14:00,239 INFO org.apache.flink.runtime.clusterframework.standalone.Sta= ndaloneResourceManager - Resource Manager associating with leading JobMana= ger Actor[akka://flink/user/jobmanager_1#-1689614413] - leader session null 10:14:00,495 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Starting TaskManager actor at akka://flink/user/taskmanager_1#20082= 43751. 10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager = - TaskManager data connection information: localhost (dataPort=3D5833= 5) 10:14:00,496 INFO org.apache.flink.runtime.taskmanager.TaskManager = - TaskManager has 4 task slot(s). 10:14:00,498 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Memory usage stats: [HEAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB (= used/committed/max)] 10:14:00,506 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Trying to register at JobManager akka://flink/user/jobmanager_1 (at= tempt 1, timeout: 500 milliseconds) 10:14:00,508 INFO org.apache.flink.runtime.clusterframework.standalone.Sta= ndaloneResourceManager - TaskManager ResourceID{resourceId=3D'8ca61ba2f874= 1d54ce649bd0d525d50c'} has started. 10:14:00,511 INFO org.apache.flink.runtime.instance.InstanceManager = - Registered TaskManager at localhost (akka://flink/user/taskmanager_= 1) as db51cf9997f8059543464810ffaffea3. Current number of registered hosts = is 1. Current number of alive task slots is 4. 10:14:00,517 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Successful registration at JobManager (akka://flink/user/jobmanager= _1), starting network stack and library cache. 10:14:00,534 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Determined BLOB server address to be localhost/127.0.0.1:34360. Sta= rting BLOB cache. 10:14:00,535 INFO org.apache.flink.runtime.blob.BlobCache = - Created BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-4= 0de-b24c-0e2c57c030d0 10:14:00,535 INFO org.apache.flink.runtime.metrics.MetricRegistry = - No metrics reporter configured, no metrics will be exposed/reported= . 10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor = - Received job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705)= . 10:14:00,549 INFO org.apache.flink.runtime.client.JobClientActor = - Could not submit job Flink Streaming Job (6d641f666b26088a843355ff3= b0b1705), because there is no connection to a JobManager. 10:14:00,551 INFO org.apache.flink.runtime.client.JobClientActor = - Disconnect from JobManager null. 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor = - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1689614= 413]. 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor = - Connected to new JobManager akka://flink/user/jobmanager_1. 10:14:00,557 INFO org.apache.flink.runtime.client.JobClientActor = - Sending message to JobManager akka://flink/user/jobmanager_1 to sub= mit job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705) and wait for= progress 10:14:00,558 INFO org.apache.flink.runtime.client.JobClientActor = - Upload jar files to job manager akka://flink/user/jobmanager_1. 10:14:00,560 INFO org.apache.flink.runtime.client.JobClientActor = - Submit job to the job manager akka://flink/user/jobmanager_1. 10:14:00,563 INFO org.apache.flink.runtime.jobmanager.JobManager = - Submitting job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Jo= b). 10:14:00,568 INFO org.apache.flink.runtime.jobmanager.JobManager = - Using restart strategy NoRestartStrategy for 6d641f666b26088a843355= ff3b0b1705. 10:14:00,628 INFO org.apache.flink.runtime.jobmanager.JobManager = - Scheduling job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Jo= b). 10:14:00,628 INFO org.apache.flink.runtime.client.JobClientActor = - Job was successfully submitted to the JobManager akka://flink/user/= jobmanager_1. 10:14:00,631 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) swit= ched from CREATED to SCHEDULED 10:14:00,631 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Job execution switched to status RUNNING. 08/08/2016 10:14:00 Job execution switched to status RUNNING. 10:14:00,632 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULE= D 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED 10:14:00,633 INFO org.apache.flink.runtime.jobmanager.JobManager = - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job= ) changed to RUNNING. 10:14:00,637 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) swit= ched from SCHEDULED to DEPLOYING 10:14:00,638 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYIN= G 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING 10:14:00,638 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Source: Custom Source (1/1) (attempt #0) to localhost 10:14:00,642 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CREATED = to SCHEDULED 10:14:00,642 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED 08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED 10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from SCHEDULE= D to DEPLOYING 10:14:00,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Map (1/4) (attempt #0) to localhost 10:14:00,643 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING 08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING 10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CREATED = to SCHEDULED 10:14:00,648 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from SCHEDULE= D to DEPLOYING 10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED 08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED 10:14:00,649 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING 08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING 10:14:00,649 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Map (2/4) (attempt #0) to localhost 10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CREATED = to SCHEDULED 10:14:00,650 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED 10:14:00,650 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from SCHEDULE= D to DEPLOYING 08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED 10:14:00,655 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Map (3/4) (attempt #0) to localhost 10:14:00,656 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CREATED = to SCHEDULED 10:14:00,657 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from SCHEDULE= D to DEPLOYING 10:14:00,657 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING 08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING 10:14:00,659 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Map (4/4) (attempt #0) to localhost 10:14:00,659 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED 08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED 10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING 08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING 10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697= e35) switched from CREATED to SCHEDULED 10:14:00,660 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to= SCHEDULED 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDUL= ED 10:14:00,660 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697= e35) switched from SCHEDULED to DEPLOYING 10:14:00,661 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to= DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYI= NG 10:14:00,661 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Filter -> Map -> Sink: Unnamed (1/4) (attempt #0) to loca= lhost 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d75401= 6a7) switched from CREATED to SCHEDULED 10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to= SCHEDULED 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d75401= 6a7) switched from SCHEDULED to DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDUL= ED 10:14:00,662 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to= DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYI= NG 10:14:00,662 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Filter -> Map -> Sink: Unnamed (2/4) (attempt #0) to loca= lhost 10:14:00,663 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255= b57) switched from CREATED to SCHEDULED 10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to= SCHEDULED 10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255= b57) switched from SCHEDULED to DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDUL= ED 10:14:00,664 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Filter -> Map -> Sink: Unnamed (3/4) (attempt #0) to loca= lhost 10:14:00,664 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to= DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYI= NG 10:14:00,665 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df446= 8fa) switched from CREATED to SCHEDULED 10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to= SCHEDULED 10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df446= 8fa) switched from SCHEDULED to DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDUL= ED 10:14:00,666 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Deploying Filter -> Map -> Sink: Unnamed (4/4) (attempt #0) to loca= lhost 10:14:00,666 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to= DEPLOYING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYI= NG 10:14:00,674 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Source: Custom Source (1/1) 10:14:00,674 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Source: Custom Source (1/1) 10:14:00,678 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Map (1/4) 10:14:00,678 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Map (1/4) 10:14:00,683 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Map (2/4) 10:14:00,684 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Map (2/4) 10:14:00,684 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Map (3/4) 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Source: Custom Source (1/1) [DEPLOYING= ] 10:14:00,692 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Map (2/4) [DEPLOYING] 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Map (3/4) 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Map (4/4) 10:14:00,691 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Map (1/4) [DEPLOYING] 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Filter -> Map -> Sink: Unnamed (1/4) 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Map (3/4) [DEPLOYING] 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Filter -> Map -> Sink: Unnamed (2/4) 10:14:00,695 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Map (4/4) 10:14:00,696 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Filter -> Map -> Sink: Unnamed (1/4) 10:14:00,703 INFO org.apache.flink.runtime.taskmanager.Task = - Map (2/4) switched to RUNNING 10:14:00,704 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Filter -> Map -> Sink: Unnamed (1/4) [= DEPLOYING] 10:14:00,701 INFO org.apache.flink.runtime.taskmanager.Task = - Source: Custom Source (1/1) switched to RUNNING 10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Filter -> Map -> Sink: Unnamed (2/4) 10:14:00,709 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Filter -> Map -> Sink: Unnamed (2/4) [= DEPLOYING] 10:14:00,699 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Filter -> Map -> Sink: Unnamed (3/4) 10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Filter -> Map -> Sink: Unnamed (3/4) 10:14:00,698 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Map (4/4) [DEPLOYING] 10:14:00,711 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Received task Filter -> Map -> Sink: Unnamed (4/4) 10:14:00,710 INFO org.apache.flink.runtime.taskmanager.Task = - Map (3/4) switched to RUNNING 10:14:00,715 INFO org.apache.flink.runtime.taskmanager.Task = - Map (1/4) switched to RUNNING 10:14:00,719 INFO org.apache.flink.runtime.taskmanager.Task = - Loading JAR files for task Filter -> Map -> Sink: Unnamed (4/4) 10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (1/4) switched to RUNNING 10:14:00,723 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (2/4) switched to RUNNING 10:14:00,720 INFO org.apache.flink.runtime.taskmanager.Task = - Map (4/4) switched to RUNNING 10:14:00,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from DEPLOYIN= G to RUNNING 10:14:00,725 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) swit= ched from DEPLOYING to RUNNING 10:14:00,725 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(2/4) switched to RUNNING 08/08/2016 10:14:00 Map(2/4) switched to RUNNING 10:14:00,726 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING 08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING 10:14:00,730 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from DEPLOYIN= G to RUNNING 10:14:00,730 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(3/4) switched to RUNNING 08/08/2016 10:14:00 Map(3/4) switched to RUNNING 10:14:00,732 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from DEPLOYIN= G to RUNNING 10:14:00,732 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(1/4) switched to RUNNING 08/08/2016 10:14:00 Map(1/4) switched to RUNNING 10:14:00,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697= e35) switched from DEPLOYING to RUNNING 10:14:00,724 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Filter -> Map -> Sink: Unnamed (3/4) [= DEPLOYING] 10:14:00,736 INFO org.apache.flink.runtime.taskmanager.Task = - Registering task at network: Filter -> Map -> Sink: Unnamed (4/4) [= DEPLOYING] 10:14:00,736 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to= RUNNING 10:14:00,750 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (3/4) switched to RUNNING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING 10:14:00,750 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d75401= 6a7) switched from DEPLOYING to RUNNING 10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from DEPLOYIN= G to RUNNING 10:14:00,751 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to= RUNNING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING 10:14:00,752 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Map(4/4) switched to RUNNING 08/08/2016 10:14:00 Map(4/4) switched to RUNNING 10:14:00,751 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255= b57) switched from DEPLOYING to RUNNING 10:14:00,757 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (4/4) switched to RUNNING 10:14:00,770 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df446= 8fa) switched from DEPLOYING to RUNNING 10:14:00,756 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,766 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,765 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to= RUNNING 10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING 10:14:00,771 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,776 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to= RUNNING 08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING 10:14:00,782 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,782 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,787 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,787 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,789 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,790 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,791 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,792 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,792 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,793 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,794 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,794 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,796 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,796 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,804 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,804 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,806 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,806 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,807 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,808 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,808 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:00,807 WARN org.apache.flink.streaming.runtime.tasks.StreamTask = - No state backend has been specified, using default state backend (M= emory / JobManager) 10:14:00,809 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - State backend is set to heap memory (checkpoint to jobmanager) 10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,032 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,034 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamO= perator - Exception while closing user function while failing or canceling= task com.rabbitmq.client.AlreadyClosedException: channel is already closed due t= o channel error; protocol method: #method(reply-code=3D406, = reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vho= st '/' not equivalent, class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.jav= a:265) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.ja= va:261) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.jav= a:114) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(F= unctionUtils.java:45) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispo= se(AbstractUdfStreamOperator.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(= StreamTask.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:332) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamO= perator - Exception while closing user function while failing or canceling= task com.rabbitmq.client.AlreadyClosedException: channel is already closed due t= o channel error; protocol method: #method(reply-code=3D406, = reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vho= st '/' not equivalent, class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.jav= a:265) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.ja= va:261) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.jav= a:114) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(F= unctionUtils.java:45) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispo= se(AbstractUdfStreamOperator.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(= StreamTask.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:332) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamO= perator - Exception while closing user function while failing or canceling= task com.rabbitmq.client.AlreadyClosedException: channel is already closed due t= o channel error; protocol method: #method(reply-code=3D406, = reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vho= st '/' not equivalent, class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.jav= a:265) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.ja= va:261) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.jav= a:114) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(F= unctionUtils.java:45) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispo= se(AbstractUdfStreamOperator.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(= StreamTask.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:332) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task = - Task execution failed. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractStreamO= perator - Exception while closing user function while failing or canceling= task com.rabbitmq.client.AlreadyClosedException: channel is already closed due t= o channel error; protocol method: #method(reply-code=3D406, = reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vho= st '/' not equivalent, class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.jav= a:265) at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.ja= va:261) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:565) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:505) at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:498) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.jav= a:114) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(F= unctionUtils.java:45) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispo= se(AbstractUdfStreamOperator.java:107) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(= StreamTask.java:426) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:332) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) 10:14:01,041 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (3/4) switched to FAILED with except= ion. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task = - Task execution failed. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,043 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (2/4) switched to FAILED with except= ion. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task = - Task execution failed. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task = - Task execution failed. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,045 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (1/4) switched to FAILED with except= ion. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,046 INFO org.apache.flink.runtime.taskmanager.Task = - Filter -> Map -> Sink: Unnamed (4/4) switched to FAILED with except= ion. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,057 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Filter -> Map -> Sink: Unnamed (4/4) 10:14:01,058 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Filter -> Map -> Sink: Unnamed (1/4) 10:14:01,059 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Filter -> Map -> Sink: Unnamed (2/4) 10:14:01,063 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Filter -> Map -> Sink: Unnamed (3/4) 10:14:01,074 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state FAILED to Job= Manager for task Filter -> Map -> Sink: Unnamed (7c219dda43bc53571d306fc0df= 4468fa) 10:14:01,078 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state FAILED to Job= Manager for task Filter -> Map -> Sink: Unnamed (dafdfde1351e3eb9593fa227e8= 255b57) 10:14:01,080 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state FAILED to Job= Manager for task Filter -> Map -> Sink: Unnamed (54a627d8091077e6742cd74d75= 4016a7) 10:14:01,082 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state FAILED to Job= Manager for task Filter -> Map -> Sink: Unnamed (b3a3e69c7dd082fc744b6ec791= 697e35) 10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (4/4) (7c219dda43bc53571d306fc0df446= 8fa) switched from RUNNING to FAILED 10:14:01,085 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (3/4) (dafdfde1351e3eb9593fa227e8255= b57) switched from RUNNING to FAILED 10:14:01,086 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (2/4) (54a627d8091077e6742cd74d75401= 6a7) switched from RUNNING to FAILED 10:14:01,086 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to= FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,087 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to= FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,088 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to= FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,089 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Filter -> Map -> Sink: Unnamed (1/4) (b3a3e69c7dd082fc744b6ec791697= e35) switched from RUNNING to FAILED 10:14:01,090 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to= FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,092 INFO org.apache.flink.runtime.jobmanager.JobManager = - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job= ) changed to FAILING. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,092 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Job execution switched to status FAILING. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,092 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) swit= ched from RUNNING to CANCELING 08/08/2016 10:14:01 Job execution switched to status FAILING. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,093 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELIN= G 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING 10:14:01,095 INFO org.apache.flink.runtime.taskmanager.Task = - Attempting to cancel task Source: Custom Source (1/1) 10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task = - Source: Custom Source (1/1) switched to CANCELING 10:14:01,096 INFO org.apache.flink.runtime.taskmanager.Task = - Triggering cancellation of task code Source: Custom Source (1/1) (9= ffba0e9f84adca163121d50f88e519e). 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from RUNNING = to CANCELING 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from RUNNING = to CANCELING 10:14:01,097 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from RUNNING = to CANCELING 10:14:01,098 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from RUNNING = to CANCELING 10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(1/4) switched to CANCELING 08/08/2016 10:14:01 Map(1/4) switched to CANCELING 10:14:01,098 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(2/4) switched to CANCELING 08/08/2016 10:14:01 Map(2/4) switched to CANCELING 10:14:01,099 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(3/4) switched to CANCELING 08/08/2016 10:14:01 Map(3/4) switched to CANCELING 10:14:01,100 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(4/4) switched to CANCELING 08/08/2016 10:14:01 Map(4/4) switched to CANCELING 10:14:01,107 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task = - Attempting to cancel task Map (1/4) 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task = - Map (1/4) switched to CANCELING 10:14:01,109 INFO org.apache.flink.runtime.taskmanager.Task = - Triggering cancellation of task code Map (1/4) (4a13efc94d64d615321= 06fbd9bdfaedb). 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task = - Attempting to cancel task Map (2/4) 10:14:01,110 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task = - Map (2/4) switched to CANCELING 10:14:01,110 INFO org.apache.flink.runtime.taskmanager.Task = - Triggering cancellation of task code Map (2/4) (99136f14b2e32b44318= b49b2ad39dde5). 10:14:01,111 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,111 INFO org.apache.flink.runtime.taskmanager.Task = - Attempting to cancel task Map (3/4) 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task = - Map (3/4) switched to CANCELING 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task = - Triggering cancellation of task code Map (3/4) (029bc351981b5056208= 839ce988f0f3f). 10:14:01,112 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task = - Attempting to cancel task Map (4/4) 10:14:01,112 INFO org.apache.flink.runtime.taskmanager.Task = - Map (4/4) switched to CANCELING 10:14:01,113 INFO org.apache.flink.runtime.taskmanager.Task = - Triggering cancellation of task code Map (4/4) (ae5e9324d04ffac7843= 317749a2e86dd). 10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task = - Map (1/4) switched to CANCELED 10:14:01,114 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Map (1/4) 10:14:01,116 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Discarding the results produced by task execution b3a3e69c7dd082fc7= 44b6ec791697e35 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task = - Map (2/4) switched to CANCELED 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Discarding the results produced by task execution 54a627d8091077e67= 42cd74d754016a7 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task = - Map (3/4) switched to CANCELED 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Map (2/4) 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Map (3/4) 10:14:01,118 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Discarding the results produced by task execution dafdfde1351e3eb95= 93fa227e8255b57 10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Discarding the results produced by task execution 7c219dda43bc53571= d306fc0df4468fa 10:14:01,119 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state CANCELED to J= obManager for task Map (4a13efc94d64d61532106fbd9bdfaedb) 10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state CANCELED to J= obManager for task Map (99136f14b2e32b44318b49b2ad39dde5) 10:14:01,120 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state CANCELED to J= obManager for task Map (029bc351981b5056208839ce988f0f3f) 10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb) switched from CANCELIN= G to CANCELED 10:14:01,121 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (2/4) (99136f14b2e32b44318b49b2ad39dde5) switched from CANCELIN= G to CANCELED 10:14:01,121 INFO org.apache.flink.streaming.runtime.tasks.StreamTask = - Timer service is shutting down. 10:14:01,122 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (3/4) (029bc351981b5056208839ce988f0f3f) switched from CANCELIN= G to CANCELED 10:14:01,122 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(1/4) switched to CANCELED 08/08/2016 10:14:01 Map(1/4) switched to CANCELED 10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(2/4) switched to CANCELED 08/08/2016 10:14:01 Map(2/4) switched to CANCELED 10:14:01,123 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(3/4) switched to CANCELED 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task = - Map (4/4) switched to CANCELED 08/08/2016 10:14:01 Map(3/4) switched to CANCELED 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Map (4/4) 10:14:01,124 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state CANCELED to J= obManager for task Map (ae5e9324d04ffac7843317749a2e86dd) 10:14:01,131 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Map (4/4) (ae5e9324d04ffac7843317749a2e86dd) switched from CANCELIN= G to CANCELED 10:14:01,132 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Map(4/4) switched to CANCELED 08/08/2016 10:14:01 Map(4/4) switched to CANCELED 10:14:01,147 INFO org.apache.flink.runtime.taskmanager.Task = - Source: Custom Source (1/1) switched to CANCELED 10:14:01,148 INFO org.apache.flink.runtime.taskmanager.Task = - Freeing task resources for Source: Custom Source (1/1) 10:14:01,148 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Un-registering task and sending final execution state CANCELED to J= obManager for task Source: Custom Source (9ffba0e9f84adca163121d50f88e519e) 10:14:01,149 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph = - Source: Custom Source (1/1) (9ffba0e9f84adca163121d50f88e519e) swit= ched from CANCELING to CANCELED 10:14:01,150 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED 08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED 10:14:01,154 INFO org.apache.flink.runtime.jobmanager.JobManager = - Status of job 6d641f666b26088a843355ff3b0b1705 (Flink Streaming Job= ) changed to FAILED. java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more 10:14:01,154 INFO org.apache.flink.runtime.client.JobClientActor = - 08/08/2016 10:14:01 Job execution switched to status FAILED. 08/08/2016 10:14:01 Job execution switched to status FAILED. 10:14:01,160 INFO org.apache.flink.runtime.client.JobClient = - Job execution failed 10:14:01,160 INFO org.apache.flink.runtime.client.JobClientActor = - Terminate JobClientActor. 10:14:01,161 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster = - Stopping FlinkMiniCluster. 10:14:01,161 INFO org.apache.flink.runtime.client.JobClientActor = - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-16= 89614413]. 10:14:01,174 INFO org.apache.flink.runtime.jobmanager.JobManager = - Stopping JobManager akka://flink/user/jobmanager_1. 10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Stopping TaskManager akka://flink/user/taskmanager_1#2008243751. 10:14:01,184 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Disassociating from JobManager 10:14:01,192 INFO org.apache.flink.runtime.blob.BlobCache = - Shutting down BlobCache 10:14:01,194 INFO org.apache.flink.runtime.blob.BlobServer = - Stopped BLOB server at 0.0.0.0:34360 10:14:01,208 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager = - I/O manager removed spill file directory /tmp/flink-io-e6686803-fb1= 3-4e2f-a4df-cb6ca465eb2b 10:14:01,212 INFO org.apache.flink.runtime.taskmanager.TaskManager = - Task manager akka://flink/user/taskmanager_1 is completely shut dow= n. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionExce= ption: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$= $anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$= $anonfun$applyOrElse$8.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$= $anonfun$applyOrElse$8.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Fut= ure.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:= 24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstrac= tDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.ja= va:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.= java:107) Caused by: java.lang.RuntimeException: Error while creating the channel at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :84) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(Fu= nctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(= AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str= eamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.ja= va:256) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) at org.apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java= :82) ... 6 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Bl= ockingValueOrException.java:33) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQ= Channel.java:343) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ... 9 more Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; prot= ocol method: #method(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChan= nel.java:144) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:5= 50) ... 1 more ________________________________ From: Robert Metzger Sent: Monday, August 8, 2016 9:48:39 AM To: user@flink.apache.org Subject: Re: Using RabbitMQ Sinks Hi Paul, the example in the code is outdated, StringToByteSerializer has probably be= en removed quite a while ago. I'll update the documentation once we figured= out the other problem you reported. What's the exception you are getting? Regards, Robert On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman > wrote: Hi all, The documentation describing the use of RabbitMQ as a sink gives the follow= ing example: RMQConnectionConfig connectionConfig =3D new RMQConnectionConfig.Builder() .setHost("localhost").setPort(5000).setUserName(..) .setPassword(..).setVirtualHost("/").build(); stream.addSink(new RMQSink(connectionConfig, "hello", new StringToB= yteSerializer())); However, a search of the flink github mirrored repo does not show where StringToByteSerializer is defined and only sh= ows it being used in the documentation of this example. I've tried using a SimpleStringSchema which seems to handl= e serialization but this raises an exception when I attempt to run it. Does anyone have any experience with using a RabbitMQ sink? Any pointers = as to what I'm doing wrong. Thanks, Paul --_000_CY1PR02MB1995F96E96479E767C50F1CA801B0CY1PR02MB1995namp_ Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable

Robert,  


It looks like the root cause exception is: 


com.rabbitmq.client.ShutdownSignalException: channel error; protocol m= ethod: #method<channel.close>(reply-code=3D406, reply-text=3DPRECONDI= TION_FAILED - parameters for queue 'flink-sink' in vhost '/' not equivalent= , class-id=3D50, method-id=3D10)
The full execution trace is shown below and occurs when I try to execu= te my process.   In the process, I initially create a connection to RM= Q using the RQQConnectionConfig.Builder() and this works fine if I read usi= ng a SimpleStringSchema but re-using the same connection configuration for the sink as follows:

msgs.addSink(new RMQSink<String>(rmqConnConfig, ALERT_SINK_QUEUE= , new SimpleStringSchema()));

Where msgs are Strings.

Paul

Connected to the target VM, address: '127.0.0.1:42208', transport: 'so= cket'
10:13:50,555 INFO  org.apache.flink.api.java.typeutils.TypeExtrac= tor             - class com.physiq.alert.Aler= tMessageIn is not a valid POJO type
10:13:57,035 INFO  org.apache.flink.streaming.api.environment.Loc= alStreamEnvironment  - Running job on local embedded Flink mini cluste= r
10:13:58,100 INFO  org.apache.flink.runtime.minicluster.FlinkMini= Cluster         - Starting FlinkMiniCluster.
10:13:58,580 INFO  akka.event.slf4j.Slf4jLogger     &nb= sp;                     &= nbsp;      - Slf4jLogger started
10:13:58,614 INFO  org.apache.flink.runtime.blob.BlobServer  = ;                    - Cr= eated BLOB server storage directory /tmp/blobStore-b19e22df-4636-4e96-bb45-= db70c0bcc7e1
10:13:58,620 INFO  org.apache.flink.runtime.blob.BlobServer  = ;                    - St= arted BLOB server at 0.0.0.0:34360 - max concurrent requests: 50 - max back= log: 1000
10:13:58,635 INFO  org.apache.flink.runtime.checkpoint.savepoint.= SavepointStoreFactory  - Using job manager savepoint state backend.
10:13:58,642 INFO  org.apache.flink.runtime.metrics.MetricRegistr= y               - No metrics reporter co= nfigured, no metrics will be exposed/reported.
10:13:58,655 INFO  org.apache.flink.runtime.jobmanager.MemoryArch= ivist           - Started memory archivist akka://= flink/user/archive_1
10:13:58,657 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Starting JobManag= er at akka://flink/user/jobmanager_1.
10:13:58,678 INFO  org.apache.flink.runtime.clusterframework.stan= dalone.StandaloneResourceManager  - Trying to associate with JobManage= r leader akka://flink/user/jobmanager_1
10:13:58,683 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Messages between TaskM= anager and JobManager have a max timeout of 10000 milliseconds
10:13:58,703 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Temporary file directo= ry '/tmp': total 102 GB, usable 9 GB (8.82% usable)
10:13:58,844 INFO  org.apache.flink.runtime.io.network.buffer.Net= workBufferPool  - Allocated 64 MB for network buffer pool (number of m= emory segments: 2048, bytes per segment: 32768).
10:13:58,846 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Limiting managed memor= y to 549 MB, memory will be allocated lazily.
10:14:00,163 INFO  org.apache.flink.runtime.io.disk.iomanager.IOM= anager          - I/O manager uses directory /tmp/= flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b for spill files.
10:14:00,167 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - JobManager akka:/= /flink/user/jobmanager_1 was granted leadership with leader session ID None= .
10:14:00,239 INFO  org.apache.flink.runtime.filecache.FileCache &= nbsp;                - User file ca= che uses directory /tmp/flink-dist-cache-2e8256c3-532c-45b6-ad2d-fa7b43846a= 6e
10:14:00,239 INFO  org.apache.flink.runtime.clusterframework.stan= dalone.StandaloneResourceManager  - Resource Manager associating with = leading JobManager Actor[akka://flink/user/jobmanager_1#-1689614413] - lead= er session null
10:14:00,495 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Starting TaskManager a= ctor at akka://flink/user/taskmanager_1#2008243751.
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - TaskManager data conne= ction information: localhost (dataPort=3D58335)
10:14:00,496 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - TaskManager has 4 task= slot(s).
10:14:00,498 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Memory usage stats: [H= EAP: 77/195/1701 MB, NON HEAP: 22/33/214 MB (used/committed/max)]
10:14:00,506 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Trying to register at = JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 millisec= onds)
10:14:00,508 INFO  org.apache.flink.runtime.clusterframework.stan= dalone.StandaloneResourceManager  - TaskManager ResourceID{resourceId= =3D'8ca61ba2f8741d54ce649bd0d525d50c'} has started.
10:14:00,511 INFO  org.apache.flink.runtime.instance.InstanceMana= ger             - Registered TaskManager at l= ocalhost (akka://flink/user/taskmanager_1) as db51cf9997f8059543464810ffaff= ea3. Current number of registered hosts is 1. Current number of alive task slots is 4.
10:14:00,517 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Successful registratio= n at JobManager (akka://flink/user/jobmanager_1), starting network stack an= d library cache.
10:14:00,534 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Determined BLOB server= address to be localhost/127.0.0.1:34360. Starting BLOB cache.
10:14:00,535 INFO  org.apache.flink.runtime.blob.BlobCache  =                     - Cr= eated BLOB cache storage directory /tmp/blobStore-1a9647ad-11f0-40de-b24c-0= e2c57c030d0
10:14:00,535 INFO  org.apache.flink.runtime.metrics.MetricRegistr= y               - No metrics reporter co= nfigured, no metrics will be exposed/reported.
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor=                - Received job Flin= k Streaming Job (6d641f666b26088a843355ff3b0b1705).
10:14:00,549 INFO  org.apache.flink.runtime.client.JobClientActor=                - Could not submit = job Flink Streaming Job (6d641f666b26088a843355ff3b0b1705), because there i= s no connection to a JobManager.
10:14:00,551 INFO  org.apache.flink.runtime.client.JobClientActor=                - Disconnect from J= obManager null.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor=                - Connect to JobMan= ager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor=                - Connected to new = JobManager akka://flink/user/jobmanager_1.
10:14:00,557 INFO  org.apache.flink.runtime.client.JobClientActor=                - Sending message t= o JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming J= ob (6d641f666b26088a843355ff3b0b1705) and wait for progress
10:14:00,558 INFO  org.apache.flink.runtime.client.JobClientActor=                - Upload jar files = to job manager akka://flink/user/jobmanager_1.
10:14:00,560 INFO  org.apache.flink.runtime.client.JobClientActor=                - Submit job to the= job manager akka://flink/user/jobmanager_1.
10:14:00,563 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Submitting job 6d= 641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,568 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Using restart str= ategy NoRestartStrategy for 6d641f666b26088a843355ff3b0b1705.
10:14:00,628 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Scheduling job 6d= 641f666b26088a843355ff3b0b1705 (Flink Streaming Job).
10:14:00,628 INFO  org.apache.flink.runtime.client.JobClientActor=                - Job was successfu= lly submitted to the JobManager akka://flink/user/jobmanager_1.
10:14:00,631 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Source: Custom Source (1/1) (9ffba0e9= f84adca163121d50f88e519e) switched from CREATED to SCHEDULED
10:14:00,631 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Job execution switched to status RUNNING.
08/08/2016 10:14:00 Job execution switched to status RUNNING.
10:14:00,632 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Source: Custom Source(1/1) switched to SCHEDULED 
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to SCHEDULED 
10:14:00,633 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Status of job 6d6= 41f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to RUNNING.
10:14:00,637 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Source: Custom Source (1/1) (9ffba0e9= f84adca163121d50f88e519e) switched from SCHEDULED to DEPLOYING
10:14:00,638 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Source: Custom Source(1/1) switched to DEPLOYING 
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to DEPLOYING 
10:14:00,638 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Source: Custom Source (1/1)= (attempt #0) to localhost
10:14:00,642 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9b= dfaedb) switched from CREATED to SCHEDULED
10:14:00,642 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(1/4) switched to SCHEDULED 
08/08/2016 10:14:00 Map(1/4) switched to SCHEDULED 
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9b= dfaedb) switched from SCHEDULED to DEPLOYING
10:14:00,643 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Map (1/4) (attempt #0) to l= ocalhost
10:14:00,643 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(1/4) switched to DEPLOYING 
08/08/2016 10:14:00 Map(1/4) switched to DEPLOYING 
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad= 39dde5) switched from CREATED to SCHEDULED
10:14:00,648 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad= 39dde5) switched from SCHEDULED to DEPLOYING
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(2/4) switched to SCHEDULED 
08/08/2016 10:14:00 Map(2/4) switched to SCHEDULED 
10:14:00,649 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(2/4) switched to DEPLOYING 
08/08/2016 10:14:00 Map(2/4) switched to DEPLOYING 
10:14:00,649 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Map (2/4) (attempt #0) to l= ocalhost
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (3/4) (029bc351981b5056208839ce98= 8f0f3f) switched from CREATED to SCHEDULED
10:14:00,650 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(3/4) switched to SCHEDULED 
10:14:00,650 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (3/4) (029bc351981b5056208839ce98= 8f0f3f) switched from SCHEDULED to DEPLOYING
08/08/2016 10:14:00 Map(3/4) switched to SCHEDULED 
10:14:00,655 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Map (3/4) (attempt #0) to l= ocalhost
10:14:00,656 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a= 2e86dd) switched from CREATED to SCHEDULED
10:14:00,657 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a= 2e86dd) switched from SCHEDULED to DEPLOYING
10:14:00,657 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(3/4) switched to DEPLOYING 
08/08/2016 10:14:00 Map(3/4) switched to DEPLOYING 
10:14:00,659 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Map (4/4) (attempt #0) to l= ocalhost
10:14:00,659 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(4/4) switched to SCHEDULED 
08/08/2016 10:14:00 Map(4/4) switched to SCHEDULED 
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(4/4) switched to DEPLOYING 
08/08/2016 10:14:00 Map(4/4) switched to DEPLOYING 
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from CREATED to SCHEDULED=
10:14:00,660 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED = ;
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to SCHEDULED 
10:14:00,660 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from SCHEDULED to DEPLOYI= NG
10:14:00,661 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING = ;
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to DEPLOYING 
10:14:00,661 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Filter -> Map -> Sink= : Unnamed (1/4) (attempt #0) to localhost
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (2/4) (54a627d8091077e6742cd74d754016a7) switched from CREATED to SCHEDULED=
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED = ;
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (2/4) (54a627d8091077e6742cd74d754016a7) switched from SCHEDULED to DEPLOYI= NG
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to SCHEDULED 
10:14:00,662 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING = ;
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to DEPLOYING 
10:14:00,662 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Filter -> Map -> Sink= : Unnamed (2/4) (attempt #0) to localhost
10:14:00,663 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from CREATED to SCHEDULED=
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED = ;
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from SCHEDULED to DEPLOYI= NG
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to SCHEDULED 
10:14:00,664 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Filter -> Map -> Sink= : Unnamed (3/4) (attempt #0) to localhost
10:14:00,664 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING = ;
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to DEPLOYING 
10:14:00,665 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from CREATED to SCHEDULED=
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED = ;
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from SCHEDULED to DEPLOYI= NG
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to SCHEDULED 
10:14:00,666 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Deploying Filter -> Map -> Sink= : Unnamed (4/4) (attempt #0) to localhost
10:14:00,666 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING = ;
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to DEPLOYING 
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Source: = Custom Source (1/1)
10:14:00,674 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Source: Custom Source (1/1)
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Map (1/4= )
10:14:00,678 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Map (1/4)
10:14:00,683 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Map (2/4= )
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Map (2/4)
10:14:00,684 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Map (3/4= )
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Source: Custom Source (1/1) [DEPLOYING]
10:14:00,692 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Map (2/4) [DEPLOYING]
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Map (3/4)
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Map (4/4= )
10:14:00,691 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Map (1/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Filter -= > Map -> Sink: Unnamed (1/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Map (3/4) [DEPLOYING]
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Filter -= > Map -> Sink: Unnamed (2/4)
10:14:00,695 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Map (4/4)
10:14:00,696 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Filter -> Map -> Sink: Unnamed (1/4)
10:14:00,703 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (2/= 4) switched to RUNNING
10:14:00,704 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Filter -> Map -> Sink: Unnamed (1/4) [DEPLOYING= ]
10:14:00,701 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Source:= Custom Source (1/1) switched to RUNNING
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Filter -> Map -> Sink: Unnamed (2/4)
10:14:00,709 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Filter -> Map -> Sink: Unnamed (2/4) [DEPLOYING= ]
10:14:00,699 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Filter -= > Map -> Sink: Unnamed (3/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Filter -> Map -> Sink: Unnamed (3/4)
10:14:00,698 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Map (4/4) [DEPLOYING]
10:14:00,711 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Received task Filter -= > Map -> Sink: Unnamed (4/4)
10:14:00,710 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (3/= 4) switched to RUNNING
10:14:00,715 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (1/= 4) switched to RUNNING
10:14:00,719 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Loading= JAR files for task Filter -> Map -> Sink: Unnamed (4/4)
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (1/4) switched to RUNNING
10:14:00,723 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (2/4) switched to RUNNING
10:14:00,720 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (4/= 4) switched to RUNNING
10:14:00,718 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad= 39dde5) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Source: Custom Source (1/1) (9ffba0e9= f84adca163121d50f88e519e) switched from DEPLOYING to RUNNING
10:14:00,725 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(2/4) switched to RUNNING 
08/08/2016 10:14:00 Map(2/4) switched to RUNNING 
10:14:00,726 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Source: Custom Source(1/1) switched to RUNNING 
08/08/2016 10:14:00 Source: Custom Source(1/1) switched to RUNNING 
10:14:00,730 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (3/4) (029bc351981b5056208839ce98= 8f0f3f) switched from DEPLOYING to RUNNING
10:14:00,730 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(3/4) switched to RUNNING 
08/08/2016 10:14:00 Map(3/4) switched to RUNNING 
10:14:00,732 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9b= dfaedb) switched from DEPLOYING to RUNNING
10:14:00,732 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(1/4) switched to RUNNING 
08/08/2016 10:14:00 Map(1/4) switched to RUNNING 
10:14:00,733 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from DEPLOYING to RUNNING=
10:14:00,724 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Filter -> Map -> Sink: Unnamed (3/4) [DEPLOYING= ]
10:14:00,736 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Registe= ring task at network: Filter -> Map -> Sink: Unnamed (4/4) [DEPLOYING= ]
10:14:00,736 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING <= /div>
10:14:00,750 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (3/4) switched to RUNNING
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(1/4) switched to RUNNING 
10:14:00,750 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (2/4) (54a627d8091077e6742cd74d754016a7) switched from DEPLOYING to RUNNING=
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a= 2e86dd) switched from DEPLOYING to RUNNING
10:14:00,751 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING <= /div>
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(2/4) switched to RUNNING 
10:14:00,752 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Map(4/4) switched to RUNNING 
08/08/2016 10:14:00 Map(4/4) switched to RUNNING 
10:14:00,751 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from DEPLOYING to RUNNING=
10:14:00,757 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (4/4) switched to RUNNING
10:14:00,770 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from DEPLOYING to RUNNING=
10:14:00,756 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,766 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,765 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING <= /div>
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(3/4) switched to RUNNING 
10:14:00,771 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,776 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING <= /div>
08/08/2016 10:14:00 Filter -> Map -> Sink: Unnamed(4/4) switched to RUNNING 
10:14:00,782 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,782 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,787 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,787 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,789 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,790 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,791 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,792 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,792 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,793 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,794 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,794 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,796 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,796 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,804 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,804 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,806 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,806 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,807 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,808 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,808 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:00,807 WARN  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - No state backend has been specif= ied, using default state backend (Memory / JobManager)
10:14:00,809 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - State backend is set to heap mem= ory (checkpoint to jobmanager)
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,032 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,034 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractSt= reamOperator  - Exception while closing user function while failing or= canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed = due to channel error; protocol method: #method<channel.close>(reply-c= ode=3D406, reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-s= ink' in vhost '/' not equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)=
at com= .rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261= )
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)=
at org= .apache.flink.api.common.functions.util.FunctionUtils.closeFunction(Functio= nUtils.java:45)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(Abs= tractUdfStreamOperator.java:107)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(Stream= Task.java:426)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractSt= reamOperator  - Exception while closing user function while failing or= canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed = due to channel error; protocol method: #method<channel.close>(reply-c= ode=3D406, reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-s= ink' in vhost '/' not equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)=
at com= .rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261= )
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)=
at org= .apache.flink.api.common.functions.util.FunctionUtils.closeFunction(Functio= nUtils.java:45)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(Abs= tractUdfStreamOperator.java:107)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(Stream= Task.java:426)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractSt= reamOperator  - Exception while closing user function while failing or= canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed = due to channel error; protocol method: #method<channel.close>(reply-c= ode=3D406, reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-s= ink' in vhost '/' not equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)=
at com= .rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261= )
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)=
at org= .apache.flink.api.common.functions.util.FunctionUtils.closeFunction(Functio= nUtils.java:45)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(Abs= tractUdfStreamOperator.java:107)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(Stream= Task.java:426)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task   &n= bsp;                 - Task executi= on failed. 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,037 ERROR org.apache.flink.streaming.api.operators.AbstractSt= reamOperator  - Exception while closing user function while failing or= canceling task
com.rabbitmq.client.AlreadyClosedException: channel is already closed = due to channel error; protocol method: #method<channel.close>(reply-c= ode=3D406, reply-text=3DPRECONDITION_FAILED - parameters for queue 'flink-s= ink' in vhost '/' not equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:265)=
at com= .rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:261= )
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:565)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:505)
at com= .rabbitmq.client.impl.ChannelN.close(ChannelN.java:498)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.close(RMQSink.java:114)=
at org= .apache.flink.api.common.functions.util.FunctionUtils.closeFunction(Functio= nUtils.java:45)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(Abs= tractUdfStreamOperator.java:107)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(Stream= Task.java:426)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
10:14:01,041 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (3/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,040 ERROR org.apache.flink.runtime.taskmanager.Task   &n= bsp;                 - Task executi= on failed. 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,043 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (2/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,039 ERROR org.apache.flink.runtime.taskmanager.Task   &n= bsp;                 - Task executi= on failed. 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,041 ERROR org.apache.flink.runtime.taskmanager.Task   &n= bsp;                 - Task executi= on failed. 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,045 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (1/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,046 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Filter = -> Map -> Sink: Unnamed (4/4) switched to FAILED with exception.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,057 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Filter -> Map -> Sink: Unnamed (4/4)
10:14:01,058 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Filter -> Map -> Sink: Unnamed (1/4)
10:14:01,059 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Filter -> Map -> Sink: Unnamed (2/4)
10:14:01,063 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Filter -> Map -> Sink: Unnamed (3/4)
10:14:01,074 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state FAILED to JobManager for task Filter -> = Map -> Sink: Unnamed (7c219dda43bc53571d306fc0df4468fa)
10:14:01,078 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state FAILED to JobManager for task Filter -> = Map -> Sink: Unnamed (dafdfde1351e3eb9593fa227e8255b57)
10:14:01,080 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state FAILED to JobManager for task Filter -> = Map -> Sink: Unnamed (54a627d8091077e6742cd74d754016a7)
10:14:01,082 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state FAILED to JobManager for task Filter -> = Map -> Sink: Unnamed (b3a3e69c7dd082fc744b6ec791697e35)
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (4/4) (7c219dda43bc53571d306fc0df4468fa) switched from RUNNING to FAILED
10:14:01,085 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (3/4) (dafdfde1351e3eb9593fa227e8255b57) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (2/4) (54a627d8091077e6742cd74d754016a7) switched from RUNNING to FAILED
10:14:01,086 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(4/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

10:14:01,087 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(3/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

10:14:01,088 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(2/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

10:14:01,089 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Filter -> Map -> Sink: Unnamed = (1/4) (b3a3e69c7dd082fc744b6ec791697e35) switched from RUNNING to FAILED
10:14:01,090 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

08/08/2016 10:14:01 Filter -> Map -> Sink: Unnamed(1/4) switched to FAILED 
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more

10:14:01,092 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Status of job 6d6= 41f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILING.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,092 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,092 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Source: Custom Source (1/1) (9ffba0e9= f84adca163121d50f88e519e) switched from RUNNING to CANCELING
08/08/2016 10:14:01 Job execution switched to status FAILING.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,093 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Source: Custom Source(1/1) switched to CANCELING 
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELING 
10:14:01,095 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Attempt= ing to cancel task Source: Custom Source (1/1)
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Source:= Custom Source (1/1) switched to CANCELING
10:14:01,096 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Trigger= ing cancellation of task code Source: Custom Source (1/1) (9ffba0e9f84adca1= 63121d50f88e519e).
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9b= dfaedb) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad= 39dde5) switched from RUNNING to CANCELING
10:14:01,097 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (3/4) (029bc351981b5056208839ce98= 8f0f3f) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a= 2e86dd) switched from RUNNING to CANCELING
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(1/4) switched to CANCELING 
08/08/2016 10:14:01 Map(1/4) switched to CANCELING 
10:14:01,098 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(2/4) switched to CANCELING 
08/08/2016 10:14:01 Map(2/4) switched to CANCELING 
10:14:01,099 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(3/4) switched to CANCELING 
08/08/2016 10:14:01 Map(3/4) switched to CANCELING 
10:14:01,100 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(4/4) switched to CANCELING 
08/08/2016 10:14:01 Map(4/4) switched to CANCELING 
10:14:01,107 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Attempt= ing to cancel task Map (1/4)
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (1/= 4) switched to CANCELING
10:14:01,109 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Trigger= ing cancellation of task code Map (1/4) (4a13efc94d64d61532106fbd9bdfaedb).=
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Attempt= ing to cancel task Map (2/4)
10:14:01,110 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (2/= 4) switched to CANCELING
10:14:01,110 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Trigger= ing cancellation of task code Map (2/4) (99136f14b2e32b44318b49b2ad39dde5).=
10:14:01,111 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,111 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Attempt= ing to cancel task Map (3/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (3/= 4) switched to CANCELING
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Trigger= ing cancellation of task code Map (3/4) (029bc351981b5056208839ce988f0f3f).=
10:14:01,112 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Attempt= ing to cancel task Map (4/4)
10:14:01,112 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (4/= 4) switched to CANCELING
10:14:01,113 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Trigger= ing cancellation of task code Map (4/4) (ae5e9324d04ffac7843317749a2e86dd).=
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (1/= 4) switched to CANCELED
10:14:01,114 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Map (1/4)
10:14:01,116 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Discarding the results= produced by task execution b3a3e69c7dd082fc744b6ec791697e35
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (2/= 4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Discarding the results= produced by task execution 54a627d8091077e6742cd74d754016a7
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (3/= 4) switched to CANCELED
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Map (2/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Map (3/4)
10:14:01,118 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Discarding the results= produced by task execution dafdfde1351e3eb9593fa227e8255b57
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Discarding the results= produced by task execution 7c219dda43bc53571d306fc0df4468fa
10:14:01,119 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state CANCELED to JobManager for task Map (4a13ef= c94d64d61532106fbd9bdfaedb)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state CANCELED to JobManager for task Map (99136f= 14b2e32b44318b49b2ad39dde5)
10:14:01,120 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state CANCELED to JobManager for task Map (029bc3= 51981b5056208839ce988f0f3f)
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (1/4) (4a13efc94d64d61532106fbd9b= dfaedb) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (2/4) (99136f14b2e32b44318b49b2ad= 39dde5) switched from CANCELING to CANCELED
10:14:01,121 INFO  org.apache.flink.streaming.runtime.tasks.Strea= mTask           - Timer service is shutting down.<= /div>
10:14:01,122 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (3/4) (029bc351981b5056208839ce98= 8f0f3f) switched from CANCELING to CANCELED
10:14:01,122 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(1/4) switched to CANCELED 
08/08/2016 10:14:01 Map(1/4) switched to CANCELED 
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(2/4) switched to CANCELED 
08/08/2016 10:14:01 Map(2/4) switched to CANCELED 
10:14:01,123 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(3/4) switched to CANCELED 
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Map (4/= 4) switched to CANCELED
08/08/2016 10:14:01 Map(3/4) switched to CANCELED 
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Map (4/4)
10:14:01,124 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state CANCELED to JobManager for task Map (ae5e93= 24d04ffac7843317749a2e86dd)
10:14:01,131 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Map (4/4) (ae5e9324d04ffac7843317749a= 2e86dd) switched from CANCELING to CANCELED
10:14:01,132 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Map(4/4) switched to CANCELED 
08/08/2016 10:14:01 Map(4/4) switched to CANCELED 
10:14:01,147 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Source:= Custom Source (1/1) switched to CANCELED
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.Task &nbs= p;                   - Freeing= task resources for Source: Custom Source (1/1)
10:14:01,148 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Un-registering task an= d sending final execution state CANCELED to JobManager for task Source: Cus= tom Source (9ffba0e9f84adca163121d50f88e519e)
10:14:01,149 INFO  org.apache.flink.runtime.executiongraph.Execut= ionGraph        - Source: Custom Source (1/1) (9ffba0e9= f84adca163121d50f88e519e) switched from CANCELING to CANCELED
10:14:01,150 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Source: Custom Source(1/1) switched to CANCELED 
08/08/2016 10:14:01 Source: Custom Source(1/1) switched to CANCELED 
10:14:01,154 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Status of job 6d6= 41f666b26088a843355ff3b0b1705 (Flink Streaming Job) changed to FAILED.
java.lang.RuntimeException: Error while creating the channel
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more
10:14:01,154 INFO  org.apache.flink.runtime.client.JobClientActor=                - 08/08/2016 10:14:= 01 Job execution switched to status FAILED.
08/08/2016 10:14:01 Job execution switched to status FAILED.
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClient &nbs= p;                   - Job exe= cution failed
10:14:01,160 INFO  org.apache.flink.runtime.client.JobClientActor=                - Terminate JobClie= ntActor.
10:14:01,161 INFO  org.apache.flink.runtime.minicluster.FlinkMini= Cluster         - Stopping FlinkMiniCluster.
10:14:01,161 INFO  org.apache.flink.runtime.client.JobClientActor=                - Disconnect from J= obManager Actor[akka://flink/user/jobmanager_1#-1689614413].
10:14:01,174 INFO  org.apache.flink.runtime.jobmanager.JobManager=                - Stopping JobManag= er akka://flink/user/jobmanager_1.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Stopping TaskManager a= kka://flink/user/taskmanager_1#2008243751.
10:14:01,184 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Disassociating from Jo= bManager
10:14:01,192 INFO  org.apache.flink.runtime.blob.BlobCache  =                     - Sh= utting down BlobCache
10:14:01,194 INFO  org.apache.flink.runtime.blob.BlobServer  = ;                    - St= opped BLOB server at 0.0.0.0:34360
10:14:01,208 INFO  org.apache.flink.runtime.io.disk.iomanager.IOM= anager          - I/O manager removed spill file d= irectory /tmp/flink-io-e6686803-fb13-4e2f-a4df-cb6ca465eb2b
10:14:01,212 INFO  org.apache.flink.runtime.taskmanager.TaskManag= er              - Task manager akka://fl= ink/user/taskmanager_1 is completely shut down.
Exception in thread "main" org.apache.flink.runtime.client.J= obExecutionException: Job execution failed.
at org= .apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonf= un$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org= .apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonf= un$applyOrElse$8.apply(JobManager.scala:768)
at org= .apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonf= un$applyOrElse$8.apply(JobManager.scala:768)
at sca= la.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.sc= ala:24)
at sca= la.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akk= a.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akk= a.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispa= tcher.scala:401)
at sca= la.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at sca= la.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:133= 9)
at sca= la.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at sca= la.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:1= 07)
Caused by: java.lang.RuntimeException: Error while creating the channe= l
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:84)
at org= .apache.flink.api.common.functions.util.FunctionUtils.openFunction(Function= Utils.java:38)
at org= .apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(Abstra= ctUdfStreamOperator.java:91)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTas= k.java:376)
at org= .apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256= )
at org= .apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at jav= a.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com= .rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779)
at com= .rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at org= .apache.flink.streaming.connectors.rabbitmq.RMQSink.open(RMQSink.java:82)
... 6 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com= .rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(Blocking= ValueOrException.java:33)
at com= .rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChanne= l.java:343)
at com= .rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com= .rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 9 = more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;= protocol method: #method<channel.close>(reply-code=3D406, reply-text= =3DPRECONDITION_FAILED - parameters for queue 'flink-sink' in vhost '/' not= equivalent, class-id=3D50, method-id=3D10)
at com= .rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com= .rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com= .rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.ja= va:144)
at com= .rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com= .rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
... 1 = more



From: Robert Metzger <rm= etzger@apache.org>
Sent: Monday, August 8, 2016 9:48:39 AM
To: user@flink.apache.org
Subject: Re: Using RabbitMQ Sinks
 
Hi Paul,

the example in the code is outdated, StringToByteSerializer has p= robably been removed quite a while ago. I'll update the documentation once = we figured out the other problem you reported.
What's the exception you are getting?

Regards,
Robert

On Mon, Aug 8, 2016 at 4:33 PM, Paul Joireman <paul.joir= eman@physiq.com> wrote:

Hi all,


The documentation describing the use of RabbitMQ as a sink gives the fol= lowing example:


=
RMQConnectionConfig connectionConfig =3D new RMQConnectionConfig.Builder()
.setHost("localhost").setPort(5=
000).setUserName(..)
.setPassword(..).setVirtualHost("/").build();
stream.addSink(new RMQSink<String>(connectionConfig, "hello&q=
uot;, new S=
tringToByteSerializer()));
However, a search of the flink github mirrored repo does not show where StringToByteSeriali= zer is defined and only shows it being used in the documentation of

this example.    I've tried using a SimpleStringSchema which s= eems to handle serialization but this raises an exception when I attempt to=

run it.   


Does anyone have any experience with using a RabbitMQ sink?   Any p= ointers as to what I'm doing wrong.


Thanks,

Paul


--_000_CY1PR02MB1995F96E96479E767C50F1CA801B0CY1PR02MB1995namp_--