flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lopez, Javier" <javier.lo...@zalando.de>
Subject Re: Problem with Kafka 0.9 Client
Date Fri, 19 Feb 2016 11:03:06 GMT
Hi, these are the properties:

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");

We have tested with these as well:

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");


and these:

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

With all three different configurations we get the same result.

On 19 February 2016 at 11:55, Robert Metzger <rmetzger@apache.org> wrote:

> Thank you. Can you send me also the list of properties you are passing to
> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>
> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <javier.lopez@zalando.de>
> wrote:
>
>> Hi Robert,
>>
>> Please find attached the full logs of one of our latest executions. We
>> are basically trying to read from our kafka cluster and then writing the
>> data to elasticsearch.
>>
>> Thanks for your help!
>>
>> On 18 February 2016 at 11:19, Robert Metzger <rmetzger@apache.org> wrote:
>>
>>> Hi Javier,
>>>
>>> sorry for the late response. In the Error Mapping of Kafka, it says that
>>> code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>
>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>
>>> How many brokers did you put into the list of bootstrap servers?
>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>> reading from Kafka?
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <javier.lopez@zalando.de
>>> > wrote:
>>>
>>>> Hi guys,
>>>>
>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
>>>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
>>>> the following:
>>>>
>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>>          - Sending metadata request ClientRequest(expectResponse=true,
>>>> callback=null,
>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>>           - Updated cluster metadata version 838 to Cluster(nodes =
>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic
=
>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>>> 10:53:24,398 DEBUG
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>>> group metadata request to broker 35
>>>> 10:53:24,432 DEBUG
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>>> body={group_id=test}), createdTimeMs=1455702804398,
>>>> sendTimeMs=1455702804398),
>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>>
>>>>
>>>> We receive this message all the time. What we don't know understand is
>>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>>> works and we have some clients extracting data from it, so we don't know
if
>>>> this could be a Kafka issue or a Flink issue.
>>>>
>>>> Does anyone know, or understand, this response we are getting from
>>>> Kafka?
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>

Mime
View raw message