flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Kafka Test Error
Date Fri, 01 Apr 2016 16:54:31 GMT
The issue may be that you include Kafka twice:

1) You explicitly add "org.apache.kafka:kafka-clients:*0.9.0.0*"
2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which
internally adds "org.apache.kafka:kafka-clients:*0.9.0.1*"

These two Kafka versions may conflict. I would drop the dependency (1) and
simply let the FlinkKafkaConsumer pull whatever dependency it needs by
itself.
The 0.9.0.1 client the Flink internally uses should read fine from Kafka
0.9.0.0 brokers.

Greetings,
Stephan


On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <shenzhunallen@gmail.com> wrote:

> Yeah, I mean I read the demo with FlinkKafkaConsumer08(
> http://data-artisans.com/kafka-flink-a-practical-how-to/) then I wrote
> the program based on Kafka 0.9.0.0 and Flink 1.0.0.
>
> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
> Did you make sure the flinkconnector version and flink version is the same
> ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>
> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunallen@gmail.com> wrote:
>
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "test");
>>         properties.setProperty("key.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("value.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("partition.assignment.strategy", "range");
>>
>>         DataStream<String> messageStream = env
>>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs",
>> new SimpleStringSchema(), properties));
>>
>>         messageStream
>>                 .rebalance()
>>                 .map(new MapFunction<String, String>() {
>>
>>                     @Override
>>                     public String map(String value) throws Exception {
>>                         return "Kafka and Flink says: " + value;
>>                     }
>>                 }).print();
>>
>>         env.execute();
>>     }
>>
>>
>> Always got the error below:
>>
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>
>>
>>
>>
>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutosh16@gmail.com>
>> wrote:
>>
>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use
>> following dependency.
>>
>> <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>              <version>1.0.0</version>
>>             <scope>provided</scope>
>>   </dependency>
>>
>> Thanks
>> Ashutosh
>>
>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunallen@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I check my build.gradle file, I use
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that
>>> this lib is based on kaka-clients 0.9.0.1.
>>>
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but
>>> I’m confused by Flink’s libs with different versions. Which
>>> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>>
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>>
>>> Any advice ?
>>>
>>> Thanks.
>>>
>>>
>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>> Hi!
>>>
>>> A "NoSuchMethodError" usually means that you compile and run against
>>> different versions.
>>>
>>> Make sure the version you reference in the IDE and the version on the
>>> cluster are the same.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
>>>> talk about kafka 0.9.0.1.
>>>>
>>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> flink version: 1.0.0
>>>>> kafka version: 0.9.0.0
>>>>> env: local
>>>>>
>>>>> I run the script below:
>>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic
>>>>> nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect
>>>>> localhost:2181 --group.id myGroup --partition.assignment.strategy
>>>>> round robin
>>>>>
>>>>> But I got the error:
>>>>> ava.lang.NoSuchMethodError:
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>>>
>>>>>
>>>>> The code as  below:
>>>>>         DataStream<String> messageStream = env.addSource(new
>>>>> FlinkKafkaConsumer09<>("nginx-logs", new
>>>>> SimpleStringSchema(),parameterTool.getProperties()));
>>>>>         messageStream.rebalance().map(new MapFunction<String,
>>>>> String>() {
>>>>>
>>>>>             @Override
>>>>>             public String map(String value) throws Exception {
>>>>>                 return "Kafka and Flink says: " + value;
>>>>>             }
>>>>>         }).print();
>>>>>
>>>>>
>>>>> I check the error with google, but it shows that it is a method of
>>>>> kafka 0.9.01. Any idea? Thanks.
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message