flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wendong <wendong....@gmail.com>
Subject Flink Kafka example in Scala
Date Thu, 16 Jul 2015 00:45:34 GMT
Hello,

Does anyone have a simple example of Flink Kafka written in Scala? 

I've been struggling to make my test program working. Below is my program
which has error in addSink (the part of KafkaWordCountProducer is copied
from Spark sample program):

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
ProducerRecord}

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new KafkaSource[String]("localhost:2181", "test", new
SimpleStringSchema))
      .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))
      .print

    println("Hi TestKafka")
    env.execute("Test Kafka")
  }
}

object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList>
<topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
         val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }
}

There is compilation error:

[error] .................TestKafka.scala:15: type mismatch;
[error]  found   :
org.apache.flink.streaming.util.serialization.SimpleStringSchema
[error]  required:
org.apache.flink.streaming.util.serialization.SerializationSchema[String,Array[Byte]]
[error]       .addSink(new KafkaSink[String]("localhost:2181", "test", new
SimpleStringSchema))

I changed SimpleStringSchema to SerializationSchema which still doesn't
work.

I am trying to transit from Spark to Flink, but the samples in Flink are far
less than those in Spark. It would be very helpful if there is an example of
KafkaWordCount in Scala similar to that in Spark.

Thanks,

Wendong



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-example-in-Scala-tp2069.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message