flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eamon Kavanagh <kavanagh.c.ea...@gmail.com>
Subject Re: Cassamdra Connector in Scala
Date Tue, 21 Jun 2016 04:36:05 GMT
Hey Jamie,

Here's a simple example that I modeled off of the github example (
https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleSinkExample.java).
Let me know if I'm doing something silly.



import com.datastax.driver.core.Cluster.Builder
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder

class Test extends App {

  val INSERT = "INSERT INTO test.writetuple (element1, element2) VALUES (?,
?)"

  val iter = Iterator(new Tuple2("a", 1), new Tuple2("b", 2), new
Tuple2("c", 3))

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val source = env.fromCollection(iter)

  CassandraSink.addSink(source)
    .setQuery(INSERT)
    .setClusterBuilder(new ClusterBuilder() {
      override def buildCluster(builder: Builder) {
        builder.addContactPoint("127.0.0.1").build()
      }
    })
    .build()

  env.execute("WriteTupleIntoCassandra")
}


On Mon, Jun 20, 2016 at 10:53 PM, Jamie Grier <jamie@data-artisans.com>
wrote:

> This looks like a simple type mismatch.  It's impossible to help with this
> without seeing your code, though.  Can you post it here?  Thanks.
>
> -Jamie
>
>
> On Sun, Jun 19, 2016 at 3:17 PM, Eamon Kavanagh <
> kavanagh.c.eamon@gmail.com> wrote:
>
>> Hey Mailing List,
>>
>> I'm trying to use the Cassandra connector that came out recently (
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)
>> in Scala but I'm having trouble with types when I use
>> CassandraSink.addSink(in: DataStream).
>>
>> If I don't define the type it can't seem to properly infer it and if I do
>> define the type I still get an error saying there's a type mismatch.  The
>> compile errror is
>>
>> *error: type arguments [(String, String, Int),Any] do not conform to
>> method addSink's type parameter bounds [IN,T <:
>> org.apache.flink.api.java.tuple.Tuple]*
>>
>> Is this a Scala issue?  Should I switch over to Java?
>>
>>
>> Thanks!
>> Eamon
>>
>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier <https://twitter.com/jamiegrier>
> jamie@data-artisans.com
>
>

Mime
View raw message