flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Connor Woodson <cwoodson....@gmail.com>
Subject Re: Problem Events
Date Sat, 10 Aug 2013 01:08:32 GMT
To avoid the cooldown period, set the maxBackoff to 0; the failover sink
should have some better logic regarding this, but what will happen will
take a little more processing time:

an event fails -> that sink is put on the fail over list -> the event goes
to the next sink, succeeds.
next event -> failover processor checks fail over list, removes the first
sink from that list -> event goes to first sink

I am fairly confident that's how it works.

- Connor


On Wed, Aug 7, 2013 at 10:26 PM, Anat Rozenzon <anat@viber.com> wrote:

> That's what I'll do, add a static timestamp of 1 and let all the 'bad'
> messages flow into one directoty.
> Thanks
>
>
> On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis <jce@cloudera.com>wrote:
>
>> You can use a Static Interceptor before the RegexExtractor to add a
>> timestamp of zero to the header, which can then be overwritten by the
>> proper timestamp (if it exists). It also should sink misses into an obvious
>> 'miss' directory.
>>
>>
>> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <anat@viber.com> wrote:
>>
>>> After some reading in the docs I think the existing fail-over behavior
>>> can't be used to solve the 'poison' message problem as it put the 'failed'
>>> sink in  a 'cooldown' period.
>>> As the problem is in the message and not the sink, it means that after a
>>> poison message had arrived, the HDFS sink will 'fail' and thus next X
>>> messages will go to the failover sink.
>>> My only solution for now is to avoid my current problem and hope that I
>>> won't have any other problematic messages, I'll be glad to have a less
>>> fragile solution.
>>>
>>> Many thanks!
>>> Other than that, Flume looks like a great tool :-)
>>>
>>> Anat
>>>
>>>
>>> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <anat@viber.com> wrote:
>>>
>>>> I think using a fail-over processor is a very good idea, I think I'll
>>>> use it as an immediate solution.
>>>> For the long run, I would like to see a general solution (not specific
>>>> to file channel, in my case it is an HDFS channel), so the suggestion to
>>>> add 'Poison Message' sink to the sink processor sound good.
>>>>
>>>> Just FYI, my problem is that a log file going through my source did not
>>>> have (in all rows) the structure I expected.
>>>>
>>>> Since I used regexp extractor to put timestamp, the 'bad' row didn't
>>>> match the regexp and the timestamp was not set, then the HDFS sink throws
>>>> NPE on that:
>>>> 01 Aug 2013 09:36:24,259 ERROR
>>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
>>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>>> headers, but it was null
>>>>         at
>>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>>         at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>         at
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> 01 Aug 2013 09:36:24,262 ERROR
>>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>>> event. Exception follows.
>>>> org.apache.flume.EventDeliveryException:
>>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>>> headers, but it was null
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>>>>         at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>         at
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> Caused by: java.lang.NullPointerException: Expected timestamp in the
>>>> Flume event headers, but it was null
>>>>         at
>>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>>         ... 3 more
>>>>
>>>>
>>>> I fixed my regexp now, still, I can never be sure all the log files the
>>>> system creates will be perfect without any bad lines.
>>>>
>>>>
>>>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cwoodson.dev@gmail.com>wrote:
>>>>
>>>>> Just some more thoughts. It could be even easier:
>>>>>
>>>>> The whole set up might be even easier; for the failover sink
>>>>> processor, you have those settings "max attempts" and "time between
>>>>> attempts" and it will just try one event X times before it gives up and
>>>>> sends it to the next sink. The time between events could even backoff
if
>>>>> needed.
>>>>>
>>>>> The advantage of this is that it preserves the ordering of events,
>>>>> something which gets completely broken in the previous scenario.
>>>>>
>>>>> - Connor
>>>>>
>>>>>
>>>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cwoodson.dev@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> As another option to solve the problem of having a bad event in a
>>>>>> channel: using a fail-over sink processor, log all bad events to
a local
>>>>>> file. And to be extra cautious, add a third failover of a null sink.
This
>>>>>> will mean that events will always flow through your channel. The
file sink
>>>>>> should almost never fail, so you shouldn't be losing events in the
process.
>>>>>> And then you can re-process everything in the file if you still want
those
>>>>>> events for something.
>>>>>>
>>>>>> For the system of having Flume detect bad events, I think
>>>>>> implementing something like above is better than discarding events
that
>>>>>> fail X times. For instance, if you have an Avro sink -> Avro source,
and
>>>>>> you're restarting your source, Flume would end up discarding events
>>>>>> unnecessarily. Instead, how about implementing the above system and
then go
>>>>>> a step further: Flume will attempt to re-send the bad events itself.
And
>>>>>> then if a bad event isn't able to be sent after X attempts, it is
can be
>>>>>> discarded.
>>>>>>
>>>>>> I envision this system as an extension to the current File Channel;
>>>>>> when an event fails, it is written to a secondary File Channel from
which
>>>>>> events can be pulled when the main channel isn't in use. It would
add
>>>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then
it can be
>>>>>> configurable for a "min time between attempts" and "maximum attempts."
When
>>>>>> an event fails the second time, those headers are updated and it
goes back
>>>>>> into the fail-channel. If it comes out of the fail-channel but the
>>>>>> lastAttempt is too recent, it goes back in. If it fails more times
than the
>>>>>> maximum, it is written to a final location (perhaps its just sent
to
>>>>>> another sink; maybe this would have to be in a sink processor). Assuming
>>>>>> all of those steps are error-free, then all messages are preserved,
and the
>>>>>> badly-formatted eventually get stored somewhere else. (This system
could be
>>>>>> hacked together with current code - fail over sink processor ->
avro sink
>>>>>> -> avro source on same instance, but that's a little too hacky).
>>>>>>
>>>>>> Just some thoughts.
>>>>>>
>>>>>> - Connor
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <arvind@apache.org>wrote:
>>>>>>
>>>>>>> This sounds like a critical problem that can cause pipelines
to
>>>>>>> block permanently. If you find yourself in this situation, a
possible work
>>>>>>> around would be to decommission the channel, remove its data
and route the
>>>>>>> flow with a new empty channel. If you have the ability to identify
which
>>>>>>> component is causing the problem and see if you can remove it
temporarily
>>>>>>> to let the problem events pass through another peer component.
>>>>>>>
>>>>>>> I have also created FLUME-2140 [1] which will eventually allow
the
>>>>>>> pipelines to identify and divert such bad events. If you have
any logs,
>>>>>>> data, configurations that can be shared and will help provide
more details
>>>>>>> for this problem, it will be great if you could attach them to
this jira
>>>>>>> and provide your comments.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>>>>
>>>>>>> Regards,
>>>>>>> Arvind Prabhakar
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>>>>> pchavez@verticalsearchworks.com> wrote:
>>>>>>>
>>>>>>>> **
>>>>>>>> There's no way to deal with a bad event once it's in the
channel,
>>>>>>>> but you can mitigate future issues by having a timestamp
interceptor bound
>>>>>>>> to the source feeding the channel. There is a parameter 'preserve
existing'
>>>>>>>> that will only add the header if it doesn't exist. If you
don't want to
>>>>>>>> have 'bad' time data in there you could try a static interceptor
with a
>>>>>>>> specific past date so that corrupt events fall into a deterministic
path in
>>>>>>>> HDFS.
>>>>>>>>
>>>>>>>> I use this technique to prevent stuck events for both timestamp
>>>>>>>> headers as well as some of our own custom headers we use
for tokenized
>>>>>>>> paths. The static interceptor will insert an arbitrary header
if it doesn't
>>>>>>>> exist so I have a couple that put in the value 'Unknown'
so that I can
>>>>>>>> still send the events through the HDFS sink but I can also
find them later
>>>>>>>> if need be.
>>>>>>>>
>>>>>>>> hope that helps,
>>>>>>>> Paul Chavez
>>>>>>>>
>>>>>>>>  ------------------------------
>>>>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>>>>> *To:* user@flume.apache.org
>>>>>>>> *Subject:* Re: Problem Events
>>>>>>>>
>>>>>>>>  some questions:
>>>>>>>> - why is the sink unable to consume the event ?
>>>>>>>> - how would you like to identify such an event ? by examining
its
>>>>>>>> content ? or by the fact that its ping-pong-ing between channel
and sink ?
>>>>>>>> - what would you prefer to do with such an event ? merely
drop it ?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>  To my knowledge (which is admittedly limited), there
is no way
>>>>>>>>> to deal with these in a way that will make your day.
 I'm happy if someone
>>>>>>>>> can say otherwise.
>>>>>>>>>
>>>>>>>>> This is very similar to a problem I had a week or two
ago.  I
>>>>>>>>> fixed it by restarting Flume with debugging on, connecting
to it with the
>>>>>>>>> debugger, and finding the message in the sink.  Discover
a bug in the sink.
>>>>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom
version, etc.
>>>>>>>>>
>>>>>>>>> I agree that this is not a practical solution, and I
still believe
>>>>>>>>> that Flume needs some sort of "sink of last resort" option
or something,
>>>>>>>>> like JMS implementations.
>>>>>>>>>
>>>>>>>>> -- Jeremy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <anat@viber.com>wrote:
>>>>>>>>>
>>>>>>>>>>  The message is already in the channel.
>>>>>>>>>> Is there a way to write an interceptor to work after
the channel?
>>>>>>>>>> or before the sink?
>>>>>>>>>>
>>>>>>>>>> The only thing I found is to stop everything and
delete the
>>>>>>>>>> channel files, but I won't be able to use this approach
in production :-(
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <paliwalashish@gmail.com>wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon
<anat@viber.com>wrote:
>>>>>>>>>>>
>>>>>>>>>>>>   Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>>>>
>>>>>>>>>>>> A 'poison' message which doesn't have timestamp
header in it as
>>>>>>>>>>>> the sink expects.
>>>>>>>>>>>> This causes a NPE which ends in returning
the message to the
>>>>>>>>>>>> channel , over and over again.
>>>>>>>>>>>>
>>>>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>>>>> Isn't there any way to intercept in the sink
work?
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> You can write a custom interceptor and remove/modify
the poison
>>>>>>>>>>> message.
>>>>>>>>>>>
>>>>>>>>>>> Interceptors are called before message makes
it way into the
>>>>>>>>>>> channel.
>>>>>>>>>>>
>>>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>>>>
>>>>>>>>>>> I wrote a blog about it a while back
>>>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Anat
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar
<
>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Sounds like a bug in ElasticSearch sink
to me. Do you mind
>>>>>>>>>>>>> filing a Jira to track this? Sample data
to cause this would be even
>>>>>>>>>>>>> better.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy
Karlson <
>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This was using the provided ElasticSearch
sink.  The logs
>>>>>>>>>>>>>> were not helpful.  I ran it through
with the debugger and found the source
>>>>>>>>>>>>>> of the problem.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive"
method to
>>>>>>>>>>>>>> determine if the content is JSON;
if it contains a "{" anywhere in it, it's
>>>>>>>>>>>>>> considered JSON.  My body contained
that but wasn't JSON, causing the JSON
>>>>>>>>>>>>>> parser to throw a CharConversionException
from addComplexField(...) (but
>>>>>>>>>>>>>> not the expected JSONException).
 We've changed addComplexField(...) to
>>>>>>>>>>>>>> catch different types of exceptions
and fall back to treating it as a
>>>>>>>>>>>>>> simple field.  We'll probably submit
a patch for this soon.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm reasonably happy with this, but
I still think that in the
>>>>>>>>>>>>>> bigger picture there should be some
sort of mechanism to automatically
>>>>>>>>>>>>>> detect and toss / skip / flag problematic
events without them plugging up
>>>>>>>>>>>>>> the flow.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM,
Arvind Prabhakar <
>>>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeremy, would it be possible
for you to show us logs for the
>>>>>>>>>>>>>>> part where the sink fails to
remove an event from the channel? I am
>>>>>>>>>>>>>>> assuming this is a standard sink
that Flume provides and not a custom one.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The reason I ask is because sinks
do not introspect the
>>>>>>>>>>>>>>> event, and hence there is no
reason why it will fail during the event's
>>>>>>>>>>>>>>> removal. It is more likely that
there is a problem within the channel in
>>>>>>>>>>>>>>> that it cannot dereference the
event correctly. Looking at the logs will
>>>>>>>>>>>>>>> help us identify the root cause
for what you are experiencing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56
PM, Jeremy Karlson <
>>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Both reasonable suggestions.
 What would a custom sink
>>>>>>>>>>>>>>>> look like in this case, and
how would I only eliminate the problem events
>>>>>>>>>>>>>>>> since I don't know what they
are until they are attempted by the "real"
>>>>>>>>>>>>>>>> sink?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My philosophical concern
(in general) is that we're taking
>>>>>>>>>>>>>>>> the approach of exhaustively
finding and eliminating possible failure
>>>>>>>>>>>>>>>> cases.  It's not possible
to eliminate every single failure case, so
>>>>>>>>>>>>>>>> shouldn't there be a method
of last resort to eliminate problem events from
>>>>>>>>>>>>>>>> the channel?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45
PM, Hari Shreedharan <
>>>>>>>>>>>>>>>> hshreedharan@cloudera.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Or you could write a
custom sink that removes this event
>>>>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  On Wednesday, July 24,
2013 at 3:36 PM, Roshan Naik
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   if you have a way to
identify such events.. you may be
>>>>>>>>>>>>>>>>> able to use the Regex
interceptor to toss them out before they get into the
>>>>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013
at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>>>>> jeremykarlson@gmail.com>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  Hi everyone.  My Flume
adventures continue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm in a situation now
where I have a channel that's
>>>>>>>>>>>>>>>>> filling because a stubborn
message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>>>>>> whatever reason; I can
go into detail but that's not my point here).  This
>>>>>>>>>>>>>>>>> just blocks up the channel
entirely, because it goes back into the channel
>>>>>>>>>>>>>>>>> when the sink refuses.
 Obviously, this isn't ideal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm wondering what mechanisms,
if any, Flume has to deal
>>>>>>>>>>>>>>>>> with these situations.
 Things that come to mind might be:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Ditch the event after
n attempts.
>>>>>>>>>>>>>>>>> 2. After n attempts,
send the event to a "problem area"
>>>>>>>>>>>>>>>>> (maybe a different source
/ sink / channel?)  that someone can look at
>>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>> 3. Some sort of mechanism
that allows operators to
>>>>>>>>>>>>>>>>> manually kill these messages.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm open to suggestions
on alternatives as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> thanks
>>>>>>>>>>> ashish
>>>>>>>>>>>
>>>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message