beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eduardo Soldera <eduardo.sold...@arquivei.com.br>
Subject Re: Problem with KafkaIO
Date Fri, 14 Sep 2018 18:14:08 GMT
Hi Raghu, thank you very much for the pull request.
We'll wait for the 2.7 Beam release.

Regards!

Em qui, 13 de set de 2018 às 18:19, Raghu Angadi <rangadi@google.com>
escreveu:

> Fix: https://github.com/apache/beam/pull/6391
>
> On Wed, Sep 12, 2018 at 3:30 PM Raghu Angadi <rangadi@google.com> wrote:
>
>> Filed BEAM-5375 <https://issues.apache.org/jira/browse/BEAM-5375>. I
>> will fix it later this week.
>>
>> On Wed, Sep 12, 2018 at 12:16 PM Raghu Angadi <rangadi@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Sep 12, 2018 at 12:11 PM Raghu Angadi <rangadi@google.com>
>>> wrote:
>>>
>>>> Thanks for the job id, I looked at the worker logs (following usual
>>>> support oncall access protocol that provides temporary access to things
>>>> like logs in GCP):
>>>>
>>>> Root issue looks like consumerPollLoop() mentioned earlier needs to
>>>> handle unchecked exception. In your case it is clear that poll thread
>>>> exited with a runtime exception. The reader does not check for it and
>>>> continues to wait for poll thread to enqueue messages. A fix should result
>>>> in an IOException for read from the source. The runners will handle that
>>>> appropriately after that.  I will file a jira.
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>>>
>>>
>>> Ignore the link.. was pasted here by mistake.
>>>
>>>
>>>>
>>>> From the logs (with a comment below each one):
>>>>
>>>>    - 2018-09-12 06:13:07.345 PDT Reader-0: reading from kafka_topic-0
>>>>    starting at offset 2
>>>>       - Implies the reader is initialized and poll thread is started.
>>>>    - 2018-09-12 06:13:07.780 PDT Reader-0: first record offset 2
>>>>       - The reader actually got a message received by the poll thread
>>>>       from Kafka.
>>>>    - 2018-09-12 06:16:48.771 PDT Reader-0: exception while fetching
>>>>    latest offset for partition kafka_topic-0. will be retried.
>>>>       - This must have happened around the time when network was
>>>>       disrupted. This is from. Actual log is from another periodic task that
>>>>       fetches latest offsets for partitions.
>>>>
>>>> The poll thread must have died around the time network was disrupted.
>>>>
>>>> The following log comes from kafka client itself and is printed every
>>>> second when KafkaIO fetches latest offset. This log seems to be added in
>>>> recent versions. It is probably an unintentional log. I don't think there
>>>> is any better to fetch latest offsets than how KafkaIO does now. This is
>>>> logged inside consumer.position() called at [1].
>>>>
>>>>    - 2018-09-12 06:13:11.786 PDT [Consumer clientId=consumer-2,
>>>>    groupId=Reader-0_offset_consumer_1735388161_genericPipe] Resetting offset
>>>>    for partition com.arquivei.dataeng.andre-0 to offset 3.
>>>>
>>>> [1]:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L678
>>>>
>>>
>>> This 'Resetting offset' is harmless, but is quite annoying to see in the
>>> worker logs. One way to avoid is to set kafka consumer's log level to
>>> WARNING. Ideally KafkaIO itself should do something to avoid it without
>>> user option.
>>>
>>>
>>>
>>>
>>>> On Wed, Sep 12, 2018 at 10:27 AM Eduardo Soldera <
>>>> eduardo.soldera@arquivei.com.br> wrote:
>>>>
>>>>> Hi Raghu! The job_id of our dev job is
>>>>> 2018-09-12_06_11_48-5600553605191377866.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Em qua, 12 de set de 2018 às 14:18, Raghu Angadi <rangadi@google.com>
>>>>> escreveu:
>>>>>
>>>>>> Thanks for debugging.
>>>>>> Can you provide the job_id of your dev job? The stacktrace shows
that
>>>>>> there is no thread running 'consumerPollLoop()' which can explain
stuck
>>>>>> reader. You will likely find a logs at line 594 & 587 [1].  Dataflow
caches
>>>>>> its readers and DirectRunner may not. That can explain DirectRunner
resume
>>>>>> reads. The expectation in KafkaIO is that Kafka client library takes
care
>>>>>> of retrying in case of connection problems (as documented). It is
possible
>>>>>> that in some cases poll() throws and we need to restart the client
in
>>>>>> KafkaIO.
>>>>>>
>>>>>> [1]:
>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L594
>>>>>>
>>>>>> On Wed, Sep 12, 2018 at 9:59 AM Eduardo Soldera <
>>>>>> eduardo.soldera@arquivei.com.br> wrote:
>>>>>>
>>>>>>> Hi Raghu, thanks for your help.
>>>>>>> Just answering your previous question, the following logs were
the
>>>>>>> same as before the error, as if the pipeline were still getting
the
>>>>>>> messages, for example:
>>>>>>>
>>>>>>> (...)
>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to
>>>>>>> offset 10.
>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to
>>>>>>> offset 15.
>>>>>>> ERROR
>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to
>>>>>>> offset 22.
>>>>>>> Resetting offset for partition com.arquivei.dataeng.andre-0 to
>>>>>>> offset 30.
>>>>>>> (...)
>>>>>>>
>>>>>>> But when checking the Kafka Consumer Group, the current offset
stays
>>>>>>> at 15, the commited offset from the last processed message, before
the
>>>>>>> error.
>>>>>>>
>>>>>>> We'll file a bug, but we could now reproduce the issue in a Dev
>>>>>>> scenario.
>>>>>>> We started the same pipeline using the direct runner, without
Google
>>>>>>> Dataflow. We blocked the Kafka Broker network and the same error
was
>>>>>>> thrown. Then we unblocked the network and the pipeline was able
to
>>>>>>> successfully process the subsequent messages.
>>>>>>> When we started the same pipeline in the Dataflow runner and
did the
>>>>>>> same test, the same problem from our production scenario happened,
Dataflow
>>>>>>> couldn't process the new messages. Unfortunately, we've stopped
the
>>>>>>> dataflow job in production, but the problematic dev job is still
running
>>>>>>> and the log file of the VM is attached. Thank you very much.
>>>>>>> Best regards
>>>>>>>
>>>>>>> Em ter, 11 de set de 2018 às 18:28, Raghu Angadi <rangadi@google.com>
>>>>>>> escreveu:
>>>>>>>
>>>>>>>> Specifically, I am interested if you have any thread running
>>>>>>>> 'consumerPollLoop()' [1]. There should always be one (if
a worker is
>>>>>>>> assigned one of the partitions). It is possible that KafkaClient
itself is
>>>>>>>> hasn't recovered from the group coordinator error (though
unlikely).
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L570
>>>>>>>>
>>>>>>>> On Tue, Sep 11, 2018 at 12:31 PM Raghu Angadi <rangadi@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Eduardo,
>>>>>>>>>
>>>>>>>>> In case of any error, the pipeline should keep on trying
to fetch.
>>>>>>>>> I don't know about this particular error. Do you see
any others afterwards
>>>>>>>>> in the log?
>>>>>>>>> Couple of things you could try if the logs are not useful
:
>>>>>>>>>  - login to one of the VMs and get stacktrace of java
worker (look
>>>>>>>>> for a container called java-streaming)
>>>>>>>>>  - file a support bug or stackoverflow question with
jobid so that
>>>>>>>>> Dataflow oncall can take a look.
>>>>>>>>>
>>>>>>>>> Raghu.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 11, 2018 at 12:10 PM Eduardo Soldera <
>>>>>>>>> eduardo.soldera@arquivei.com.br> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> We have a Apache Beam pipeline running in Google
Dataflow using
>>>>>>>>>> KafkaIO. Suddenly the pipeline stop fetching Kafka
messages at all, as our
>>>>>>>>>> other workers from other pipelines continued to get
Kafka messages.
>>>>>>>>>>
>>>>>>>>>> At the moment it stopped we got these messages:
>>>>>>>>>>
>>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe]
Error sending fetch request (sessionId=1396189203, epoch=2431598) to node 3: org.apache.kafka.common.errors.DisconnectException.
>>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe]
Group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null) is unavailable or invalid, will
attempt rediscovery
>>>>>>>>>> I  [Consumer clientId=consumer-1, groupId=genericPipe]
Discovered group coordinator 10.0.52.70:9093 (id: 2147483646 rack: null)
>>>>>>>>>>
>>>>>>>>>> And then the pipeline stopped reading the messages.
>>>>>>>>>>
>>>>>>>>>> This is the KafkaIO setup  we have:
>>>>>>>>>>
>>>>>>>>>> KafkaIO.read[String,String]()
>>>>>>>>>>   .withBootstrapServers(server)
>>>>>>>>>>   .withTopic(topic)
>>>>>>>>>>   .withKeyDeserializer(classOf[StringDeserializer])
>>>>>>>>>>   .withValueDeserializer(classOf[StringDeserializer])
>>>>>>>>>>   .updateConsumerProperties(properties)
>>>>>>>>>>   .commitOffsetsInFinalize()
>>>>>>>>>>   .withoutMetadata()
>>>>>>>>>>
>>>>>>>>>>  Any help will be much appreciated.
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> --
>>>>>>>>>> Eduardo Soldera Garcia
>>>>>>>>>> Data Engineer
>>>>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>>>> [image: Arquivei.com.br – Inteligência em Notas
Fiscais]
>>>>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>>>>> [image: Google seleciona Arquivei para imersão e
mentoria no Vale
>>>>>>>>>> do Silício]
>>>>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eduardo Soldera Garcia
>>>>>>> Data Engineer
>>>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>>>> [image: Google seleciona Arquivei para imersão e mentoria no
Vale do
>>>>>>> Silício]
>>>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>>>> <https://www.facebook.com/arquivei>
>>>>>>> <https://www.linkedin.com/company/arquivei>
>>>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Eduardo Soldera Garcia
>>>>> Data Engineer
>>>>> (16) 3509-5555 | www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=sSUUKxbXnxk>
>>>>>
>>>>

-- 
Eduardo Soldera Garcia
Data Engineer
(16) 3509-5555 | www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=sSUUKxbXnxk>

Mime
View raw message