flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lopez, Javier" <javier.lo...@zalando.de>
Subject Re: Problem with Kafka 0.9 Client
Date Tue, 23 Feb 2016 09:23:27 GMT
Hi Robert,

After we restarted our Kafka / Zookeeper cluster the consumer worked. Some
of our topics had some problems. The flink's consumer for Kafka 0.9 works
as expected.

Thanks!

On 19 February 2016 at 12:03, Lopez, Javier <javier.lopez@zalando.de> wrote:

> Hi, these are the properties:
>
> Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>         properties.put("enable.auto.commit", "true");
>         properties.put("auto.commit.interval.ms", "1000");
>         properties.put("session.timeout.ms", "30000");
>
> We have tested with these as well:
>
> Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>
>
> and these:
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>         properties.put("enable.auto.commit", "true");
>         properties.put("auto.commit.interval.ms", "1000");
>         properties.put("session.timeout.ms", "30000");
>         properties.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> With all three different configurations we get the same result.
>
> On 19 February 2016 at 11:55, Robert Metzger <rmetzger@apache.org> wrote:
>
>> Thank you. Can you send me also the list of properties you are passing to
>> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>>
>> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <javier.lopez@zalando.de>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Please find attached the full logs of one of our latest executions. We
>>> are basically trying to read from our kafka cluster and then writing the
>>> data to elasticsearch.
>>>
>>> Thanks for your help!
>>>
>>> On 18 February 2016 at 11:19, Robert Metzger <rmetzger@apache.org>
>>> wrote:
>>>
>>>> Hi Javier,
>>>>
>>>> sorry for the late response. In the Error Mapping of Kafka, it says
>>>> that code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>>
>>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>>
>>>> How many brokers did you put into the list of bootstrap servers?
>>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>>> reading from Kafka?
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <
>>>> javier.lopez@zalando.de> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have
>>>>> not been able to retrieve data from our Kafka Cluster. The DEBUG data
>>>>> reports the following:
>>>>>
>>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>>>            - Sending metadata request ClientRequest(expectResponse=true,
>>>>> callback=null,
>>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>>>           - Updated cluster metadata version 838 to Cluster(nodes =
>>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic
=
>>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr
=
>>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader =
35,
>>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas
=
>>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>>>> 10:53:24,398 DEBUG
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>>>> group metadata request to broker 35
>>>>> 10:53:24,432 DEBUG
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>>>> body={group_id=test}), createdTimeMs=1455702804398,
>>>>> sendTimeMs=1455702804398),
>>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>>>
>>>>>
>>>>> We receive this message all the time. What we don't know understand is
>>>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>>>> works and we have some clients extracting data from it, so we don't know
if
>>>>> this could be a Kafka issue or a Flink issue.
>>>>>
>>>>> Does anyone know, or understand, this response we are getting from
>>>>> Kafka?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message