flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Cassamdra Connector in Scala
Date Tue, 21 Jun 2016 16:49:02 GMT
Hi,
I think the root problem is that the CassandraSink methods that can work
with tuples accept the Tuple type that comes with Flink and not the Scala
Tuple types. If I'm not mistaken Robert is using the Flink Tuple types in
his example, that's why it works.

Cheers,
Aljoscha

On Tue, 21 Jun 2016 at 11:54 Robert Metzger <rmetzger@apache.org> wrote:

> Thank you for reporting the issue.
> We are very happy if people try out new code before the release. Please
> keep testing our Cassandra connector and report errors or usability issues.
>
> I was not able to compile your code using Scala 2.10, however, I got this
> version running: (I basically changed the iterator into a List).
> I didn't get any type-related exceptions.
>
> import com.datastax.driver.core.Cluster
> import org.apache.flink.streaming.api.scala._
> import collection.JavaConverters._
> import com.datastax.driver.core.Cluster.Builder
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.cassandra.{ClusterBuilder, CassandraSink}
>
> object MLQ {
>   def main(args: Array[String]) {
>
>
>     val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?, ?)"
>
>     val list = List(new Tuple2("a", 1), new Tuple2("b", 2), new Tuple2("c", 3))
>
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val source = env.fromCollection(list.asJava)
>
>     CassandraSink.addSink(source)
>       .setQuery(INSERT)
>       .setClusterBuilder(new ClusterBuilder() {
>         override def buildCluster(builder: Builder): Cluster = {
>
>
>           builder.addContactPoint("127.0.0.1").build()
>         }
>       })
>       .build()
>
>     env.execute("WriteTupleIntoCassandra")
>   }
> }
>
> What I got is the following (and I think its perfectly fine for not having
> a cassandra cluster running):
>
> 11:41:33,513 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>       - class scala.Tuple2 must have a default constructor to be used as a
> POJO.
> 11:41:34,573 INFO
>  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  -
> Running job on local embedded Flink mini cluster
> 11:41:34,975 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster
>       - Starting FlinkMiniCluster.
> 11:41:35,141 INFO  akka.event.slf4j.Slf4jLogger
>        - Slf4jLogger started
> 11:41:35,155 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Created BLOB server storage directory
> /tmp/blobStore-01be4415-af52-4def-856e-94626e3f4f22
> 11:41:35,156 INFO  org.apache.flink.runtime.blob.BlobServer
>        - Started BLOB server at 0.0.0.0:40771 - max concurrent requests:
> 50 - max backlog: 1000
> 11:41:35,162 INFO
>  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job
> manager savepoint state backend.
> 11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist
>       - Started memory archivist akka://flink/user/archive_1
> 11:41:35,167 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Starting JobManager at akka://flink/user/jobmanager_1.
> 11:41:35,171 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - JobManager akka://flink/user/jobmanager_1 was granted leadership
> with leader session ID None.
> 11:41:35,175 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
> 11:41:35,180 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - Resource Manager associating with leading JobManager
> Actor[akka://flink/user/jobmanager_1#-885000089] - leader session null
> 11:41:35,182 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Messages between TaskManager and JobManager have a max timeout of
> 10000 milliseconds
> 11:41:35,185 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Temporary file directory '/tmp': total 7 GB, usable 7 GB (100.00%
> usable)
> 11:41:35,397 INFO
>  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 64 MB for network buffer pool (number of memory segments: 2048, bytes per
> segment: 32768).
> 11:41:35,398 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Limiting managed memory to 1191 MB, memory will be allocated
> lazily.
> 11:41:35,400 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager
>        - I/O manager uses directory
> /tmp/flink-io-85973c2a-1672-4d1a-9985-03546acc4900 for spill files.
> 11:41:35,407 INFO  org.apache.flink.runtime.filecache.FileCache
>        - User file cache uses directory
> /tmp/flink-dist-cache-2775b3c6-ca0f-43a2-8432-623c8c99880c
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Starting TaskManager actor at
> akka://flink/user/taskmanager_1#2005524421.
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager data connection information: localhost.localdomain
> (dataPort=34395)
> 11:41:35,826 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - TaskManager has 8 task slot(s).
> 11:41:35,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Memory usage stats: [HEAP: 85/240/3541 MB, NON HEAP: 27/28/-1 MB
> (used/committed/max)]
> 11:41:36,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Trying to register at JobManager akka://flink/user/jobmanager_1
> (attempt 1, timeout: 500 milliseconds)
> 11:41:36,026 INFO
>  org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
>  - TaskManager ResourceID{resourceId='0d1a407780ffa127acdd6b036c4867a8'}
> has registered.
> 11:41:36,028 INFO  org.apache.flink.runtime.instance.InstanceManager
>       - Registered TaskManager at localhost
> (akka://flink/user/taskmanager_1) as 6ef2a28a61851c9639ad74a7b3ba4cc8.
> Current number of registered hosts is 1. Current number of alive task slots
> is 8.
> 11:41:36,034 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Successful registration at JobManager
> (akka://flink/user/jobmanager_1), starting network stack and library cache.
> 11:41:36,039 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Determined BLOB server address to be localhost/127.0.0.1:40771.
> Starting BLOB cache.
> 11:41:36,040 INFO  org.apache.flink.runtime.blob.BlobCache
>       - Created BLOB cache storage directory
> /tmp/blobStore-295ac2e2-91af-4a9b-a761-60fbd1a86b78
> 11:41:36,042 INFO  org.apache.flink.metrics.MetricRegistry
>       - No metrics reporter configured, exposing metrics via JMX
> 11:41:36,051 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Received job WriteTupleIntoCassandra
> (502e42477e7a0161c3e678dcabbe1b0c).
> 11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Could not submit job WriteTupleIntoCassandra
> (502e42477e7a0161c3e678dcabbe1b0c), because there is no connection to a
> JobManager.
> 11:41:36,052 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Disconnect from JobManager null.
> 11:41:36,054 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connect to JobManager
> Actor[akka://flink/user/jobmanager_1#-885000089].
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Connected to new JobManager akka://flink/user/jobmanager_1.
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Sending message to JobManager akka://flink/user/jobmanager_1 to
> submit job WriteTupleIntoCassandra (502e42477e7a0161c3e678dcabbe1b0c) and
> wait for progress
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Upload jar files to job manager akka://flink/user/jobmanager_1.
> 11:41:36,055 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Submit job to the job manager akka://flink/user/jobmanager_1.
> 11:41:36,057 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Submitting job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra).
> 11:41:36,095 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Using restart strategy NoRestartStrategy for
> 502e42477e7a0161c3e678dcabbe1b0c.
> 11:41:36,125 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Scheduling job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra).
> 11:41:36,125 INFO  org.apache.flink.runtime.client.JobClientActor
>        - Job was successfully submitted to the JobManager
> akka://flink/user/jobmanager_1.
> 11:41:36,126 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Job execution switched to status RUNNING.
> 06/21/2016 11:41:36 Job execution switched to status RUNNING.
> 11:41:36,127 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from CREATED to SCHEDULED
> 11:41:36,127 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to SCHEDULED
> 11:41:36,128 INFO  org.apache.flink.runtime.jobmanager.JobManager
>        - Status of job 502e42477e7a0161c3e678dcabbe1b0c
> (WriteTupleIntoCassandra) changed to RUNNING.
> 11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,130 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Source: Collection Source (1/1) (attempt #0) to localhost
> 11:41:36,130 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> DEPLOYING
> 11:41:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from CREATED to SCHEDULED
> 11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,133 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (1/8) (attempt #0) to localhost
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from CREATED to SCHEDULED
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (2/8) (attempt #0) to localhost
> 11:41:36,135 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from CREATED to SCHEDULED
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (3/8) (attempt #0) to localhost
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from CREATED to SCHEDULED
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (4/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to DEPLOYING
> 11:41:36,136 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from CREATED to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to DEPLOYING
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to DEPLOYING
> 11:41:36,137 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to SCHEDULED
> 11:41:36,137 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (5/8) (attempt #0) to localhost
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to SCHEDULED
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from CREATED to SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to SCHEDULED
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (6/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to DEPLOYING
> 11:41:36,138 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from CREATED to SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to DEPLOYING
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,139 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> SCHEDULED
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (7/8) (attempt #0) to localhost
> 11:41:36,139 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from CREATED to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from SCHEDULED to DEPLOYING
> 11:41:36,140 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Deploying Sink: Cassandra Sink (8/8) (attempt #0) to localhost
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to DEPLOYING
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> SCHEDULED
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to SCHEDULED
> 11:41:36,140 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> DEPLOYING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to DEPLOYING
> 11:41:36,155 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Source: Collection Source (1/1)
> 11:41:36,157 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Source: Collection Source (1/1)
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Source: Collection Source (1/1)
> [DEPLOYING]
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (1/8)
> 11:41:36,162 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (1/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (2/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (2/8)
> 11:41:36,163 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (1/8) [DEPLOYING]
> 11:41:36,164 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (2/8) [DEPLOYING]
> 11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (1/8) switched to RUNNING
> 11:41:36,165 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (2/8) switched to RUNNING
> 11:41:36,169 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: Collection Source (1/1) switched to RUNNING
> 11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,170 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,170 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (3/8)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (4/8)
> 11:41:36,171 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (3/8)
> 11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (3/8) [DEPLOYING]
> 11:41:36,172 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (3/8) switched to RUNNING
> 11:41:36,173 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (4/8)
> 11:41:36,175 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (4/8) [DEPLOYING]
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (4/8) switched to RUNNING
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (5/8)
> 11:41:36,176 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (5/8)
> 11:41:36,177 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (5/8) [DEPLOYING]
> 11:41:36,177 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (5/8) switched to RUNNING
> 11:41:36,177 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,178 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,178 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (6/8)
> 11:41:36,179 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,179 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,180 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (7/8)
> 11:41:36,181 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Received task Sink: Cassandra Sink (8/8)
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (7/8)
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (7/8) [DEPLOYING]
> 11:41:36,182 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (7/8) switched to RUNNING
> 11:41:36,183 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,183 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,184 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (6/8)
> 11:41:36,185 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Loading JAR files for task Sink: Cassandra Sink (8/8)
> 11:41:36,186 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from DEPLOYING to RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (6/8) [DEPLOYING]
> 11:41:36,187 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to
> RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (6/8) switched to RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(1/8) switched to RUNNING
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Registering task at network: Sink: Cassandra Sink (8/8) [DEPLOYING]
> 11:41:36,187 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (8/8) switched to RUNNING
> 11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,188 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,188 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from DEPLOYING to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from DEPLOYING to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(3/8) switched to RUNNING
> 11:41:36,190 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(4/8) switched to RUNNING
> 11:41:36,190 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - No state backend has been specified, using default state backend
> (Memory / JobManager)
> 11:41:36,190 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>       - State backend is set to heap memory (checkpoint to jobmanager)
> 11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from DEPLOYING to RUNNING
> 11:41:36,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from DEPLOYING to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from DEPLOYING to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> RUNNING
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(6/8) switched to RUNNING
> 11:41:36,194 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(5/8) switched to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from DEPLOYING to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from DEPLOYING to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(2/8) switched to RUNNING
> 11:41:36,195 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to
> RUNNING
> 11:41:36,196 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from DEPLOYING to RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(8/8) switched to RUNNING
> 11:41:36,196 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to
> RUNNING
> 06/21/2016 11:41:36 Sink: Cassandra Sink(7/8) switched to RUNNING
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Source: Collection Source (1/1) switched to FINISHED
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Source: Collection Source (1/1)
> 11:41:36,205 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FINISHED to
> JobManager for task Source: Collection Source
> (3ab880aed927f4375ec55fcd76c05fb5)
> 11:41:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Source: Collection Source (1/1) (3ab880aed927f4375ec55fcd76c05fb5)
> switched from RUNNING to FINISHED
> 11:41:36,207 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to
> FINISHED
> 06/21/2016 11:41:36 Source: Collection Source(1/1) switched to FINISHED
> 11:41:36,373 INFO  com.datastax.driver.core.NettyUtil
>        - Found Netty's native epoll transport in the classpath, using it
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,516 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:41,515 ERROR
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error
> while closing session.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.close(CassandraSinkBase.java:84)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:370)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:304)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,896 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,896 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (3/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,898 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (6/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,899 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (4/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,904 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,906 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (5/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (7/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,909 ERROR org.apache.flink.runtime.taskmanager.Task
>       - Task execution failed.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (8/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,910 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (1/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,911 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Sink: Cassandra Sink (2/8) switched to FAILED with exception.
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (5/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (2/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (7/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (8/8)
> 11:41:43,913 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (1/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (3/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (4/8)
> 11:41:43,914 INFO  org.apache.flink.runtime.taskmanager.Task
>       - Freeing task resources for Sink: Cassandra Sink (6/8)
> 11:41:43,920 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (da635c97adfd28fdd8b7ff1553a653a6)
> 11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (5b11c48da29711e75b8c1dee3491b9b1)
> 11:41:43,921 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (5dc9a026f642fe50c29cbd895152cf01)
> 11:41:43,922 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (2695a57c8a28412e62d58f7dbe379def)
> 11:41:43,924 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (d5d15f4eeb628ecc89751aa22bc9fbef)
> 11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (696224c4e158a7925b2d8ae7fc17991f)
> 11:41:43,925 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (2fcf053fb92f9a4eccd75781a15b5d62)
> 11:41:43,926 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>        - Un-registering task and sending final execution state FAILED to
> JobManager for task Sink: Cassandra Sink (31e4ff30d1360f1e4c10090c0a59a498)
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (6/8) (5b11c48da29711e75b8c1dee3491b9b1)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (5/8) (da635c97adfd28fdd8b7ff1553a653a6)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (3/8) (2695a57c8a28412e62d58f7dbe379def)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 06/21/2016 11:41:43 Sink: Cassandra Sink(6/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (1/8) (d5d15f4eeb628ecc89751aa22bc9fbef)
> switched from RUNNING to FAILED
> 11:41:43,927 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (4/8) (5dc9a026f642fe50c29cbd895152cf01)
> switched from RUNNING to FAILED
> 11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (7/8) (2fcf053fb92f9a4eccd75781a15b5d62)
> switched from RUNNING to FAILED
> 06/21/2016 11:41:43 Sink: Cassandra Sink(5/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (8/8) (696224c4e158a7925b2d8ae7fc17991f)
> switched from RUNNING to FAILED
> 11:41:43,928 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>        - Sink: Cassandra Sink (2/8) (31e4ff30d1360f1e4c10090c0a59a498)
> switched from RUNNING to CANCELING
> 11:41:43,928 INFO  org.apache.flink.runtime.client.JobClientActor
>        - 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> at
> com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:67)
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:48)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:340)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
>
> 06/21/2016 11:41:43 Sink: Cassandra Sink(3/8) switched to FAILED
> com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s)
> tried for query failed (tried: /127.0.0.1:9042
> (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1]
> Cannot connect))
> at
> com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
> <span cla
>

Mime
View raw message