flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anat Rozenzon <a...@viber.com>
Subject Re: Problem Events
Date Wed, 07 Aug 2013 05:40:53 GMT
After some reading in the docs I think the existing fail-over behavior
can't be used to solve the 'poison' message problem as it put the 'failed'
sink in  a 'cooldown' period.
As the problem is in the message and not the sink, it means that after a
poison message had arrived, the HDFS sink will 'fail' and thus next X
messages will go to the failover sink.
My only solution for now is to avoid my current problem and hope that I
won't have any other problematic messages, I'll be glad to have a less
fragile solution.

Many thanks!
Other than that, Flume looks like a great tool :-)

Anat


On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <anat@viber.com> wrote:

> I think using a fail-over processor is a very good idea, I think I'll use
> it as an immediate solution.
> For the long run, I would like to see a general solution (not specific to
> file channel, in my case it is an HDFS channel), so the suggestion to add
> 'Poison Message' sink to the sink processor sound good.
>
> Just FYI, my problem is that a log file going through my source did not
> have (in all rows) the structure I expected.
>
> Since I used regexp extractor to put timestamp, the 'bad' row didn't match
> the regexp and the timestamp was not set, then the HDFS sink throws NPE on
> that:
> 01 Aug 2013 09:36:24,259 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
> java.lang.NullPointerException: Expected timestamp in the Flume event
> headers, but it was null
>         at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at
> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>         at
> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> 01 Aug 2013 09:36:24,262 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
> Expected timestamp in the Flume event headers, but it was null
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: java.lang.NullPointerException: Expected timestamp in the Flume
> event headers, but it was null
>         at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at
> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>         at
> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>         ... 3 more
>
>
> I fixed my regexp now, still, I can never be sure all the log files the
> system creates will be perfect without any bad lines.
>
>
> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cwoodson.dev@gmail.com>wrote:
>
>> Just some more thoughts. It could be even easier:
>>
>> The whole set up might be even easier; for the failover sink processor,
>> you have those settings "max attempts" and "time between attempts" and it
>> will just try one event X times before it gives up and sends it to the next
>> sink. The time between events could even backoff if needed.
>>
>> The advantage of this is that it preserves the ordering of events,
>> something which gets completely broken in the previous scenario.
>>
>> - Connor
>>
>>
>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cwoodson.dev@gmail.com>wrote:
>>
>>> As another option to solve the problem of having a bad event in a
>>> channel: using a fail-over sink processor, log all bad events to a local
>>> file. And to be extra cautious, add a third failover of a null sink. This
>>> will mean that events will always flow through your channel. The file sink
>>> should almost never fail, so you shouldn't be losing events in the process.
>>> And then you can re-process everything in the file if you still want those
>>> events for something.
>>>
>>> For the system of having Flume detect bad events, I think implementing
>>> something like above is better than discarding events that fail X times.
>>> For instance, if you have an Avro sink -> Avro source, and you're
>>> restarting your source, Flume would end up discarding events unnecessarily.
>>> Instead, how about implementing the above system and then go a step
>>> further: Flume will attempt to re-send the bad events itself. And then if a
>>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>>
>>> I envision this system as an extension to the current File Channel; when
>>> an event fails, it is written to a secondary File Channel from which events
>>> can be pulled when the main channel isn't in use. It would add headers like
>>> "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable
>>> for a "min time between attempts" and "maximum attempts." When an event
>>> fails the second time, those headers are updated and it goes back into the
>>> fail-channel. If it comes out of the fail-channel but the lastAttempt is
>>> too recent, it goes back in. If it fails more times than the maximum, it is
>>> written to a final location (perhaps its just sent to another sink; maybe
>>> this would have to be in a sink processor). Assuming all of those steps are
>>> error-free, then all messages are preserved, and the badly-formatted
>>> eventually get stored somewhere else. (This system could be hacked together
>>> with current code - fail over sink processor -> avro sink -> avro source
on
>>> same instance, but that's a little too hacky).
>>>
>>> Just some thoughts.
>>>
>>> - Connor
>>>
>>>
>>>
>>>
>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <arvind@apache.org>wrote:
>>>
>>>> This sounds like a critical problem that can cause pipelines to block
>>>> permanently. If you find yourself in this situation, a possible work around
>>>> would be to decommission the channel, remove its data and route the flow
>>>> with a new empty channel. If you have the ability to identify which
>>>> component is causing the problem and see if you can remove it temporarily
>>>> to let the problem events pass through another peer component.
>>>>
>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>> data, configurations that can be shared and will help provide more details
>>>> for this problem, it will be great if you could attach them to this jira
>>>> and provide your comments.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>
>>>> Regards,
>>>> Arvind Prabhakar
>>>>
>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>> pchavez@verticalsearchworks.com> wrote:
>>>>
>>>>> **
>>>>> There's no way to deal with a bad event once it's in the channel, but
>>>>> you can mitigate future issues by having a timestamp interceptor bound
to
>>>>> the source feeding the channel. There is a parameter 'preserve existing'
>>>>> that will only add the header if it doesn't exist. If you don't want
to
>>>>> have 'bad' time data in there you could try a static interceptor with
a
>>>>> specific past date so that corrupt events fall into a deterministic path
in
>>>>> HDFS.
>>>>>
>>>>> I use this technique to prevent stuck events for both timestamp
>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>> paths. The static interceptor will insert an arbitrary header if it doesn't
>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>> still send the events through the HDFS sink but I can also find them
later
>>>>> if need be.
>>>>>
>>>>> hope that helps,
>>>>> Paul Chavez
>>>>>
>>>>>  ------------------------------
>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>> *To:* user@flume.apache.org
>>>>> *Subject:* Re: Problem Events
>>>>>
>>>>>  some questions:
>>>>> - why is the sink unable to consume the event ?
>>>>> - how would you like to identify such an event ? by examining its
>>>>> content ? or by the fact that its ping-pong-ing between channel and sink
?
>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>> jeremykarlson@gmail.com> wrote:
>>>>>
>>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>>> deal with these in a way that will make your day.  I'm happy if someone
can
>>>>>> say otherwise.
>>>>>>
>>>>>> This is very similar to a problem I had a week or two ago.  I fixed
>>>>>> it by restarting Flume with debugging on, connecting to it with the
>>>>>> debugger, and finding the message in the sink.  Discover a bug in
the sink.
>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version,
etc.
>>>>>>
>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>>> like JMS implementations.
>>>>>>
>>>>>> -- Jeremy
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <anat@viber.com>
wrote:
>>>>>>
>>>>>>>  The message is already in the channel.
>>>>>>> Is there a way to write an interceptor to work after the channel?
or
>>>>>>> before the sink?
>>>>>>>
>>>>>>> The only thing I found is to stop everything and delete the channel
>>>>>>> files, but I won't be able to use this approach in production
:-(
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <paliwalashish@gmail.com>wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <anat@viber.com>wrote:
>>>>>>>>
>>>>>>>>>   Hi,
>>>>>>>>>
>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>
>>>>>>>>> A 'poison' message which doesn't have timestamp header
in it as
>>>>>>>>> the sink expects.
>>>>>>>>> This causes a NPE which ends in returning the message
to the
>>>>>>>>> channel , over and over again.
>>>>>>>>>
>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>
>>>>>>>>
>>>>>>>> You can write a custom interceptor and remove/modify the
poison
>>>>>>>> message.
>>>>>>>>
>>>>>>>> Interceptors are called before message makes it way into
the
>>>>>>>> channel.
>>>>>>>>
>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>
>>>>>>>> I wrote a blog about it a while back
>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Anat
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do
you mind filing
>>>>>>>>>> a Jira to track this? Sample data to cause this would
be even better.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> This was using the provided ElasticSearch sink.
 The logs were
>>>>>>>>>>> not helpful.  I ran it through with the debugger
and found the source of
>>>>>>>>>>> the problem.
>>>>>>>>>>>
>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method
to determine
>>>>>>>>>>> if the content is JSON; if it contains a "{"
anywhere in it, it's
>>>>>>>>>>> considered JSON.  My body contained that but
wasn't JSON, causing the JSON
>>>>>>>>>>> parser to throw a CharConversionException from
addComplexField(...) (but
>>>>>>>>>>> not the expected JSONException).  We've changed
addComplexField(...) to
>>>>>>>>>>> catch different types of exceptions and fall
back to treating it as a
>>>>>>>>>>> simple field.  We'll probably submit a patch
for this soon.
>>>>>>>>>>>
>>>>>>>>>>> I'm reasonably happy with this, but I still think
that in the
>>>>>>>>>>> bigger picture there should be some sort of mechanism
to automatically
>>>>>>>>>>> detect and toss / skip / flag problematic events
without them plugging up
>>>>>>>>>>> the flow.
>>>>>>>>>>>
>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar
<
>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Jeremy, would it be possible for you to show
us logs for the
>>>>>>>>>>>> part where the sink fails to remove an event
from the channel? I am
>>>>>>>>>>>> assuming this is a standard sink that Flume
provides and not a custom one.
>>>>>>>>>>>>
>>>>>>>>>>>> The reason I ask is because sinks do not
introspect the event,
>>>>>>>>>>>> and hence there is no reason why it will
fail during the event's removal.
>>>>>>>>>>>> It is more likely that there is a problem
within the channel in that it
>>>>>>>>>>>> cannot dereference the event correctly. Looking
at the logs will help us
>>>>>>>>>>>> identify the root cause for what you are
experiencing.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson
<
>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>  Both reasonable suggestions.  What would
a custom sink look
>>>>>>>>>>>>> like in this case, and how would I only
eliminate the problem events since
>>>>>>>>>>>>> I don't know what they are until they
are attempted by the "real" sink?
>>>>>>>>>>>>>
>>>>>>>>>>>>> My philosophical concern (in general)
is that we're taking the
>>>>>>>>>>>>> approach of exhaustively finding and
eliminating possible failure cases.
>>>>>>>>>>>>>  It's not possible to eliminate every
single failure case, so shouldn't
>>>>>>>>>>>>> there be a method of last resort to eliminate
problem events from the
>>>>>>>>>>>>> channel?
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari
Shreedharan <
>>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Or you could write a custom sink
that removes this event
>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36
PM, Roshan Naik wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   if you have a way to identify such
events.. you may be
>>>>>>>>>>>>>> able to use the Regex interceptor
to toss them out before they get into the
>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM,
Jeremy Karlson <
>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures
continue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm in a situation now where I have
a channel that's filling
>>>>>>>>>>>>>> because a stubborn message is stuck.
 The sink won't accept it (for
>>>>>>>>>>>>>> whatever reason; I can go into detail
but that's not my point here).  This
>>>>>>>>>>>>>> just blocks up the channel entirely,
because it goes back into the channel
>>>>>>>>>>>>>> when the sink refuses.  Obviously,
this isn't ideal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm wondering what mechanisms, if
any, Flume has to deal with
>>>>>>>>>>>>>> these situations.  Things that come
to mind might be:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>> 2. After n attempts, send the event
to a "problem area"
>>>>>>>>>>>>>> (maybe a different source / sink
/ channel?)  that someone can look at
>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>> 3. Some sort of mechanism that allows
operators to manually
>>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm open to suggestions on alternatives
as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> thanks
>>>>>>>> ashish
>>>>>>>>
>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message