flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhun Shen <shenzhunal...@gmail.com>
Subject Re: Kafka Test Error
Date Mon, 04 Apr 2016 14:41:02 GMT
I created a new project, and only add kaka-client, Flink-kafka-connect and Flink streaming
libs, it works.

Thanks.


> On Apr 2, 2016, at 12:54 AM, Stephan Ewen <sewen@apache.org> wrote:
> 
> The issue may be that you include Kafka twice:
> 
> 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0"
> 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which internally
adds "org.apache.kafka:kafka-clients:0.9.0.1"
> 
> These two Kafka versions may conflict. I would drop the dependency (1) and simply let
the FlinkKafkaConsumer pull whatever dependency it needs by itself.
> The 0.9.0.1 client the Flink internally uses should read fine from Kafka 0.9.0.0 brokers.
> 
> Greetings,
> Stephan
> 
> 
> On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <shenzhunallen@gmail.com <mailto:shenzhunallen@gmail.com>>
wrote:
> Yeah, I mean I read the demo with FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/
<http://data-artisans.com/kafka-flink-a-practical-how-to/>) then I wrote the program
based on Kafka 0.9.0.0 and Flink 1.0.0.
> 
>> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com
<mailto:balaji.rajagopalan@olacabs.com>> wrote:
>> 
>> Did you make sure the flinkconnector version and flink version is the same ? Also
for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>> 
>> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunallen@gmail.com <mailto:shenzhunallen@gmail.com>>
wrote:
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id <http://group.id/>", "test");
>>         properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("partition.assignment.strategy", "range");
>> 
>>         DataStream<String> messageStream = env
>>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", new
SimpleStringSchema(), properties));
>> 
>>         messageStream
>>                 .rebalance()
>>                 .map(new MapFunction<String, String>() {
>> 
>>                     @Override
>>                     public String map(String value) throws Exception {
>>                         return "Kafka and Flink says: " + value;
>>                     }
>>                 }).print();
>> 
>>         env.execute();
>>     }
>> 
>> 
>> Always got the error below:
>> 
>> java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>> 
>> 
>> 
>> 
>>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutosh16@gmail.com <mailto:kmr.ashutosh16@gmail.com>>
wrote:
>>> 
>>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use following
dependency. 
>>> 
>>> <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>              <version>1.0.0</version>
>>>             <scope>provided</scope>
>>>   </dependency>
>>> 
>>> Thanks
>>> Ashutosh
>>> 
>>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunallen@gmail.com <mailto:shenzhunallen@gmail.com>>
wrote:
>>> Hi there,
>>> 
>>> I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’,
but I found that this lib is based on kaka-clients 0.9.0.1.
>>> 
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but I’m
confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable
with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>> 
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>> 
>>> Any advice ? 
>>> 
>>> Thanks.
>>> 
>>> 
>>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <sewen@apache.org <mailto:sewen@apache.org>>
wrote:
>>>> 
>>>> Hi!
>>>> 
>>>> A "NoSuchMethodError" usually means that you compile and run against different
versions.
>>>> 
>>>> Make sure the version you reference in the IDE and the version on the cluster
are the same.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> 
>>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com
<mailto:balaji.rajagopalan@olacabs.com>> wrote:
>>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk
about kafka 0.9.0.1. 
>>>> 
>>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com <mailto:shenzhunallen@gmail.com>>
wrote:
>>>> Hi there,
>>>> 
>>>> flink version: 1.0.0
>>>> kafka version: 0.9.0.0
>>>> env: local
>>>> 
>>>> I run the script below:
>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers
localhost:9092 --zookeeper.connect localhost:2181 --group.id <http://group.id/> myGroup
--partition.assignment.strategy round robin
>>>> 
>>>> But I got the error:
>>>> ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>> 
>>>> 
>>>> The code as  below:
>>>>         DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs",
new SimpleStringSchema(),parameterTool.getProperties()));
>>>>         messageStream.rebalance().map(new MapFunction<String, String>()
{
>>>> 
>>>>             @Override
>>>>             public String map(String value) throws Exception {
>>>>                 return "Kafka and Flink says: " + value;
>>>>             }
>>>>         }).print();
>>>> 
>>>> 
>>>> I check the error with google, but it shows that it is a method of kafka
0.9.01. Any idea? Thanks.
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Mime
View raw message