Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A98DE18563 for ; Wed, 6 Jan 2016 09:54:32 +0000 (UTC) Received: (qmail 68000 invoked by uid 500); 6 Jan 2016 09:54:32 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 67919 invoked by uid 500); 6 Jan 2016 09:54:32 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 67909 invoked by uid 99); 6 Jan 2016 09:54:32 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jan 2016 09:54:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id EDD41C239F for ; Wed, 6 Jan 2016 09:54:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.011 X-Spam-Level: *** X-Spam-Status: No, score=3.011 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, SPF_PASS=-0.001, T_REMOTE_IMAGE=0.01, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id QvHA64j5eI6c for ; Wed, 6 Jan 2016 09:54:21 +0000 (UTC) Received: from mail-io0-f174.google.com (mail-io0-f174.google.com [209.85.223.174]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id A50B4231B7 for ; Wed, 6 Jan 2016 09:54:20 +0000 (UTC) Received: by mail-io0-f174.google.com with SMTP id 1so166745309ion.1 for ; Wed, 06 Jan 2016 01:54:20 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=YJywb4YEa3S1yHj79waWqPi3ErJW3La4O+/G84zdQTw=; b=f4CMKrdnXwk4BRk/HvHEpkBl5aUL2oaXyMPKJqOx7SMPo3ZgiSZR0KdLSWRSZvPyWY vpGPWqVkFDs5az8MffvvKgp9Pk0a12J7iH65Ug38luNQ+sT9jnMQFgIUwfn1BoPO3tEZ f4rURVjMIiQxed2a/8aWyDnRKOTxYMA72kev+6nTpjPXDwqsY7+bibujeVVf4YpQ7u1O QG6FBEfFzzNj1D4X41llzR6V0f4m6hIw5y5HVCv0p/jzQtolT6bUFJNeTsxsUquKXCli KDXWsxa/BA4CksarkoS0lxLs9TIYU/Pldx8zvHA5WcOPWnXCDZsSj0nBvvOLEyrE5dFq 6/sQ== MIME-Version: 1.0 X-Received: by 10.107.160.208 with SMTP id j199mr86621187ioe.23.1452074060106; Wed, 06 Jan 2016 01:54:20 -0800 (PST) Sender: ewenstephan@gmail.com Received: by 10.107.176.138 with HTTP; Wed, 6 Jan 2016 01:54:20 -0800 (PST) In-Reply-To: References: Date: Wed, 6 Jan 2016 10:54:20 +0100 X-Google-Sender-Auth: QWX6tQlR7l1pJX7-JwF2bLQb8Js Message-ID: Subject: Re: kafka integration issue From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a1140bfd88a529d0528a756c2 --001a1140bfd88a529d0528a756c2 Content-Type: text/plain; charset=UTF-8 "java.lang.NoSuchMethodError" in Java virtually always means that the code is compiled against a different version than executed. The version in "~/git/flink/" must be slightly outdated. Can you pull the latest update of the 1.0-SNAPSHOT master and rebuild the code? Stephan On Tue, Jan 5, 2016 at 9:48 PM, Robert Metzger wrote: > Hi Alex, > > How recent is your Flink 1.0-SNAPSHOT build? Maybe the code on the (local) > cluster (in /git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) > is not up to date? > > I just tried it locally, and the job seems to execute: > > ./bin/flink run > /home/robert/Downloads/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar > org.apache.flink.streaming.api.scala.DataStream@436bc3601/05/2016 21:44:09 Job > execution switched to status RUNNING. > 01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to > SCHEDULED > 01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to > DEPLOYING > 01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switched to RUNNING > > By the way, in order to print the stream, you have to call counts.print() > instead of print(counts). > > > > > > > On Tue, Jan 5, 2016 at 9:35 PM, Alex Rovner > wrote: > >> I believe I have set the version uniformly, unless I am overlooking >> something in the pom. Attaching my project. >> >> I have tried building with both "mvn clean package" and "mvn clean >> package -Pbuild-jar" and I get the same exception. >> >> I am running my app with the following command: >> >> ~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/bin/flink >> run -c com.magnetic.KafkaWordCount >> ~/git/flink-poc/target/flink-poc-1.0-SNAPSHOT.jar >> >> On Tue, Jan 5, 2016 at 12:47 PM Robert Metzger >> wrote: >> >>> I think the problem is that you only set the version of the Kafka >>> connector to 1.0-SNAPSHOT, not for the rest of the Flink dependencies. >>> >>> On Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner >>> wrote: >>> >>>> Thanks Till for the info. I tried switching to 1.0-SNAPSHOT and now >>>> facing another error: >>>> >>>> Caused by: java.lang.NoSuchMethodError: >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:413) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann >>>> wrote: >>>> >>>>> Hi Alex, >>>>> >>>>> this is a bug in the `0.10` release. Is it possible for you to switch >>>>> to version `1.0-SNAPSHOT`. With this version, the error should no longer >>>>> occur. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Jan 5, 2016 at 1:31 AM, Alex Rovner >>>>> wrote: >>>>> >>>>>> Hello Flinkers! >>>>>> >>>>>> The below program produces the following error when running locally. >>>>>> I am building the program using maven, using 0.10.0 and running in >>>>>> streaming only local mode "start-local-streaming.sh". I have verified that >>>>>> kafka and the topic is working properly by using kafka-console-*.sh >>>>>> scripts. What am I doing wrong? Any help would be appreciated it. >>>>>> >>>>>> Caused by: java.lang.NumberFormatException: For input string: "" >>>>>> >>>>>> at >>>>>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) >>>>>> >>>>>> at java.lang.Long.parseLong(Long.java:601) >>>>>> >>>>>> at java.lang.Long.valueOf(Long.java:803) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.java:125) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHandler.java:88) >>>>>> >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> >>>>>> val properties = new Properties(); >>>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>>> properties.setProperty("zookeeper.connect", "localhost:2181"); >>>>>> properties.setProperty("group.id", "test"); >>>>>> >>>>>> val stream = env >>>>>> .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) >>>>>> >>>>>> val counts = stream.map(f=>f.split(",")) >>>>>> >>>>>> print(counts) >>>>>> >>>>>> env.execute() >>>>>> } >>>>>> >>>>>> -- >>>>>> *Alex Rovner* >>>>>> *Director, Data Engineering * >>>>>> *o:* 646.759.0052 >>>>>> >>>>>> * * >>>>>> >>>>>> >>>>> -- >>>> *Alex Rovner* >>>> *Director, Data Engineering * >>>> *o:* 646.759.0052 >>>> >>>> * * >>>> >>>> >>> -- >> *Alex Rovner* >> *Director, Data Engineering * >> *o:* 646.759.0052 >> >> * * >> >> > --001a1140bfd88a529d0528a756c2 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
"java.lang.NoSuchMet= hodError" in Java virtually always means that the code is compiled aga= inst a different version than executed.

The version= in "~/git/flink/"=C2=A0<= /span>must be slightly outdated. Can you p= ull the latest update of the 1.0-SNAPSHOT master and rebuild the code?

Stephan

On Tue, Jan 5, 2016 at 9:48 PM, Robert M= etzger <rmetzger@apache.org> wrote:
Hi Alex,

How recent is yo= ur Flink 1.0-SNAPSHOT build? Maybe the code on the (local) cluster (in=C2= =A0/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNAPSHOT/) is not up to date?
=
I just tried it locally, and the job seems to execute:
=

./bin/flink run /home/robert/Downloads/flink-poc/t= arget/flink-poc-1.0-SNAPSHOT.jar
org.apache.flink.streaming.api.s= cala.DataStream@436bc3601/05/2016 21:44:09 Job execution switched to status RUNNING.
01/05/2016 = 21:44:09 Source: Custom Source = -> Map(1/1) switched to SCHEDULED=C2=A0
01/05/2016 21:44:09 Source: Custom Source -> Map(1= /1) switched to DEPLOYING=C2=A0
01/05/2016 21:44:09 Source: Custom Source -> Map(1/1) switche= d to RUNNING=C2=A0

By the way, in order to p= rint the stream, you have to call=C2=A0counts.print() instead of=C2=A0print= (counts).




=


On Tue, Jan 5, 2016 at 9:35 PM,= Alex Rovner <alex.rovner@magnetic.com> wrote:
I believe I have set the versi= on uniformly, unless I am overlooking something in the pom. Attaching my pr= oject.

I have tried building with both "mvn clean p= ackage" and "mvn clean package=C2=A0-Pbuild-jar" and I get the same exception.

I am running my app with the following command:

~/git/flink/flink-dist/target/flink-1.0-SNAPSHOT-bin/flink-1.0-SNA= PSHOT/bin/flink run -c com.magnetic.KafkaWordCount ~/git/flink-poc/target/f= link-poc-1.0-SNAPSHOT.jar


On Tue, Jan 5, 2016 at 12:4= 7 PM Robert Metzger <rmetzger@apache.org> wrote:
I think the problem is that you only set the version= of the Kafka connector to 1.0-SNAPSHOT, not for the rest of the Flink depe= ndencies.

On= Tue, Jan 5, 2016 at 6:18 PM, Alex Rovner <alex.rovner@magnetic.com= > wrote:
T= hanks Till for the info. I tried switching to 1.0-SNAPSHOT and now facing a= nother error:

Caused by: java.lang.NoSuchMethodErro= r: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheck= pointingEnabled()Z
a= t org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkK= afkaConsumer.java:413)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSourc= e.java:58)
at org.ap= ache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.ja= va:55)
at org.apache= .flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
=
at org.apache.flink.runti= me.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)


= On Tue, Jan 5, 2016 at 3:54 AM Till Rohrmann <trohrmann@apache.org> wrote:
Hi Alex,

this is a bug in the `0.10` release. Is it possible for you to switch to= version `1.0-SNAPSHOT`. With this version, the error should no longer occu= r.

Cheers,
Till

On Tue, Jan 5, 2016 at 1:31 AM= , Alex Rovner <alex.rovner@magnetic.com> wrote:
Hello Flinkers!

The below program produces the following error when runnin= g locally. I am building the program using maven, using 0.10.0 and running = in streaming only local mode "start-local-streaming.sh".=C2=A0 I = have verified that kafka and the topic is working properly by using kafka-c= onsole-*.sh scripts. What am I doing wrong? Any help would be appreciated i= t.

Caused by: java.lang.NumberFormatException: For= input string: ""

at java.lang.NumberFormatException.forInputString(Nu= mberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:601)

at java.lang.Long.valueOf(Long.java:803)

at org.apache.flink.streaming.connectors.kafka.inter= nals.ZookeeperOffsetHandler.getOffsetFromZooKeeper(ZookeeperOffsetHandler.j= ava:125)

at org.apache.flink.streaming.connectors.kafka.inter= nals.ZookeeperOffsetHandler.seekFetcherToInitialOffsets(ZookeeperOffsetHand= ler.java:88)


def main(args: Array[St=
ring]) {
val env =3D StreamExecutionEnvironment.ge= tExecutionEnvironment

val properties =3D new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092&qu= ot;);
properties.setProperty("= ;zookeeper.connect", "localhost:2181"
);
properties= .setProperty("group.id"= , "test");

val stream =3D env
.addSource(new FlinkKafkaConsumer082[String]("topic"<= /span>, new SimpleStringSchema(), properties))

val counts =3D stream.map(f=3D>f.split(","))

print(counts)

env.execute()
}
--
=
Alex Rovner
Director, Data Engineering=C2=A0
= o:=C2=A0646.759.0052

--
= Alex Rovner
Director, Data Engineering=C2=A0
o:= =C2=A0646.759.0052




--001a1140bfd88a529d0528a756c2--