flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Rovner <alex.rov...@magnetic.com>
Subject kafka integration issue
Date Tue, 05 Jan 2016 00:31:19 GMT
Hello Flinkers!

The below program produces the following error when running locally. I am
building the program using maven, using 0.10.0 and running in streaming
only local mode "start-local-streaming.sh".  I have verified that kafka and
the topic is working properly by using kafka-console-*.sh scripts. What am
I doing wrong? Any help would be appreciated it.

Caused by: java.lang.NumberFormatException: For input string: ""

at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)

at
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)


def main(args: Array[String]) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties();
  properties.setProperty("bootstrap.servers", "localhost:9092");
  properties.setProperty("zookeeper.connect", "localhost:2181");
  properties.setProperty("group.id", "test");

  val stream = env
    .addSource(new FlinkKafkaConsumer082[String]("topic", new
SimpleStringSchema(), properties))

  val counts = stream.map(f=>f.split(","))

  print(counts)

  env.execute()
}

-- 
*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* <http://www.magnetic.com/>*

Mime
View raw message