flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: kafka integration issue
Date Tue, 05 Jan 2016 20:48:57 GMT
Hi Alex,

How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local)
cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/)
is not up to date?

I just tried it locally, and the job seems to execute:

./bin/flink run
/home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job
execution switched to status RUNNING.
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to SCHEDULED
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to DEPLOYING
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING

By the way, in order to print the stream, you have to call counts.print()
instead of print(counts).






On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner <alex.rovner@magnetic.com>
wrote:

> I believe I have set the version uniformly, unless I am overlooking
> something in the pom. Attaching my project.
>
> I have tried building with both "mvn clean package" and "mvn clean package -Pbuild-jar"
> and I get the same exception.
>
> I am running my app with the following command:
>
> ~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink
> run -c com.magnetic.KafkaWordCount
> ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar
>
> On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> I think the problem is that you only set the version of the Kafka
>> connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies.
>>
>> On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <alex.rovner@magnetic.com>
>> wrote:
>>
>>> Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now
>>> facing another error:
>>>
>>> Caused by: java.lang.NoSuchMethodError:
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <trohrmann@apache.org>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> this is a bug in the `0.10` release. Is it possible for you to switch
>>>> to version `1.0-SNAPSHOT`. With this version, the error should no longer
>>>> occur.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner <alex.rovner@magnetic.com>
>>>> wrote:
>>>>
>>>>> Hello Flinkers!
>>>>>
>>>>> The below program produces the following error when running locally.
I
>>>>> am building the program using maven, using 0.10.0 and running in streaming
>>>>> only local mode "start-local-streaming.sh".  I have verified that kafka
and
>>>>> the topic is working properly by using kafka-console-*.sh scripts. What
am
>>>>> I doing wrong? Any help would be appreciated it.
>>>>>
>>>>> Caused by: java.lang.NumberFormatException: For input string: ""
>>>>>
>>>>> at
>>>>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>>>>>
>>>>> at java.lang.Long.parseLong(Long.java:601)
>>>>>
>>>>> at java.lang.Long.valueOf(Long.java:803)
>>>>>
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125)
>>>>>
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88)
>>>>>
>>>>>
>>>>> def main(args: Array[String]) {
>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>
>>>>>   val properties = new Properties();
>>>>>   properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>>   properties.setProperty("zookeeper.connect", "localhost:2181");
>>>>>   properties.setProperty("group.id", "test");
>>>>>
>>>>>   val stream = env
>>>>>     .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(),
properties))
>>>>>
>>>>>   val counts = stream.map(f=>f.split(","))
>>>>>
>>>>>   print(counts)
>>>>>
>>>>>   env.execute()
>>>>> }
>>>>>
>>>>> --
>>>>> *Alex Rovner*
>>>>> *Director, Data Engineering *
>>>>> *o:* 646.759.0052
>>>>>
>>>>> * <http://www.magnetic.com/>*
>>>>>
>>>>>
>>>> --
>>> *Alex Rovner*
>>> *Director, Data Engineering *
>>> *o:* 646.759.0052
>>>
>>> * <http://www.magnetic.com/>*
>>>
>>>
>> --
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * <http://www.magnetic.com/>*
>
>

Mime
View raw message