flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Connor Woodson <cwoodson....@gmail.com>
Subject Re: "single source - multi channel" scenario and applying interceptor while writing to only one channel and not on others...possible approaches
Date Fri, 26 Apr 2013 05:15:39 GMT
Jagadish,

You are right. Your problem here seems to be more about treating your
events differently depending on the sink, and that is what I believe
Serializers are best at. Here are some directions/advice for creating a
serializer (if you look in the lists for the 'custome serializer' thread
you will find another set of directions that may or may not be additionally
useful):

1. I find the place to start is generally with pre-existing code.
BodyTextEventSerializer (this is the default serializer for the HDFS sink /
file sink if none is defined) and HeaderAndBodyTextEventSerializer (at this
link<https://github.com/apache/flume/tree/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization>)
are the two basic serializers, and are the best places to start (they do
almost the exact same thing, so you really only need to look at one of
them); out of all of the files in that link, the only other serializer is
FlumeEventAvroEventSerializer (these names are all mouthfuls...). One thing
of note is that none of these example serializers implement the configure
method - look at the AbstractAvroEventSerializer to see that method
implemented.

2. Things you need to be sure to change when you copy one of those files
are obviously the class name and probably package name, the constructor,
and then the builder class at the bottom; this builder class is what is
used to create and configure the serializer (generally you create the
serializer "EventSerializer s = new MyEventSerializer(out);" and then
configure it "s.configure(context);", or at least that's how I do it - it
appears that the BodyTextEventSerializer configures itself in its
constructor; either way is valid I suppose. The thing of note however is
that this Builder.build(...) is what is called to create an instance of
your serializer.

3. The main method is write(Event e): this method is given an event, and
you are expected to write the contents of that event in some way to the
output stream that the serializer was created with. After write(...) is
called, or maybe a few write's in a row, flush() will be called - I've
never done something with this function.

4. Some details on the other functions: supportsReopen() should return
'true' unless there is a reason for it to return 'false'. I believe this
function is only used in the HDFS Sink writers where it is checked to make
sure a serializer is able to append to an existing stream
(here<https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java#L98>and
here<https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java#L85>is
the code relating to this). afterCreate is called when a serializer is
created - afterReopen will be called instead if the serializer is appending
(the previous links use this; I don't think afterReopen is called anywhere
else). beforeClose is for just before the stream is closed, and after a
stream is closed the serializer should be removed (/ null'd / set the
serializer variable to null).

5. For my serializers, as I mentioned I implement the configure method
instead of using the constructor in the way the Body one does. I don't do
much of anything in the other functions, other than in 'write' which is
where the meat of the code goes. It appears the way to go about it would be
in your write method, using a RegEx or something else you want to pull
apart your event into its various fields, and then you will write a subset
of those fields to the output stream in one way or another.

I believe that's about it; if anything's unclear, I'll be more than happy
to fix it up.

An important note which I tried to cover above about the builder is that
when you supply your custom FQCN of your serializer for the
"agent.sinks.<sink>.serializer.type" property, you supply the FQCN of the
Builder inner-class, so it looks like this: com.connor.MySerializer$Builder

And here<https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst#installing-third-party-plugins>is
some documentation on the best way to include your custom serializer
in
Flume.

- Connor


On Tue, Apr 23, 2013 at 7:15 AM, Israel Ekpo <israel@aicer.org> wrote:

> Connor,
>
> This is a great example.
>
> Thank you for sharing this. It was an excellent tutorial.
>
> I will create a JIRA issue to document this workaround in the user guide.
>
>
>
>
> On 23 April 2013 02:52, Connor Woodson <cwoodson.dev@gmail.com> wrote:
>
>> Some more thoughts on this:
>>
>> The way Interceptors are currently set to work is that they apply to an
>> event as it is received. There are good uses for this - for instances, it
>> allows easily configuring a single Timestamp interceptor that gives all
>> events a source receives a timestamp, so even if you have multiple
>> sinks/channels responding to an event, you only have that one interceptor.
>> Interceptors in this sense serve to add data to event headers, and as such
>> it makes sense to have them applied only once by the source instead of
>> letting the channels change header data.
>>
>> If you wish to use an interceptor in the above way, to modify header
>> data, and still want that interceptor to apply for a single channel, then
>> if you don't mind could you elaborate on what you are trying to do? I
>> haven't been able to come up with a situation like that. The solution here
>> would be to do as Jeff suggested and use a serializer; if you want more
>> in-depth instructions on how to build it, please ask; I have a set of
>> directions lying around somewhere that I'll find for you.
>>
>>
>> However, the way Interceptors work I have myself faced a situation where
>> I would like the interceptors to be channel only. This use case is when I
>> want to use an Interceptor to filter events; I want to send an event to
>> some subset of channels based on the contents of its data. Here is how you
>> can do this in the current setup (where Interceptors are applied at the
>> source instead of per-channel):
>>
>> Using the Multiplexing Channel Selector you are able to choose which
>> channels an event is written to based off of the value of a specified
>> header (documentation in that link). There are some more features to the
>> selector that aren't documented, called Optional Channels or something, but
>> I don't know very much about them - just figured I would point out that
>> they exist; digging through the source should provide some more insight.
>>
>> So here is how you want to set your system up. Create an Interceptor that
>> will define a certain header value based off of the event's contents. For
>> instance, if you want all events containing exactly 1 character to be sent
>> to a channel, you could create an Interceptor that counts the characters in
>> the event. Then that Interceptor will set a certain header value to
>> "SINGLE" if there is just one character, or "MULTIPLE" if there are more.
>>
>> Then you can create your channel selector like this (modified from the
>> documentation example):
>>
>> a1.sources = r1
>> a1.channels = all_events single_events multiple_events
>> a1.sources.r1.interceptors = your_interceptor
>> a1.sources.r1.interceptors.your_interceptor.header = header
>> a1.sources.r1.selector.type = multiplexing
>> a1.sources.r1.selector.header = header
>> a1.sources.r1.selector.mapping.SINGLE = all_events single_events
>> a1.sources.r1.selector.mapping.MULTIPLE = all_events multiple_events
>> a1.sources.r1.selector.default = all_events
>>
>>
>> The result is that now you have created a way to filter which channels a
>> certain event is sent to. Note that a channel can appear more than once -
>> for instance, all_events will get all events. And so the trick is to just
>> define the right interceptor (which are much simpler to code than a
>> serializer (which itself is fairly easy)).
>>
>> Hopefully that was clear. Feel free to ask more questions,
>>
>> - Connor
>>
>>
>>
>> On Fri, Apr 19, 2013 at 11:14 AM, Jeff Lord <jlord@cloudera.com> wrote:
>>
>>> Jagadish,
>>>
>>> Here is an example of how to write a custom serializer.
>>>
>>>
>>> https://github.com/apache/flume/blob/trunk/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
>>>
>>> -Jeff
>>>
>>>
>>> On Fri, Apr 19, 2013 at 9:34 AM, Jeff Lord <jlord@cloudera.com> wrote:
>>>
>>>> Hi Jagadish,
>>>>
>>>> Have you considered using a custom event serializer to modify your
>>>> event?
>>>> Its possible to replicate your flow using two channels and then have
>>>> one sink that implements a custom serializer to modify the event.
>>>>
>>>> -Jeff
>>>>
>>>>
>>>> On Tue, Apr 16, 2013 at 11:12 PM, Jagadish Bihani <
>>>> jagadish.bihani@pubmatic.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> If anybody has any inputs on this that will surely help.
>>>>>
>>>>> Regards,
>>>>> Jagadish
>>>>>
>>>>>
>>>>> On 04/16/2013 12:06 PM, Jagadish Bihani wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> We have a use case in which
>>>>>> 1. spooling source reads data.
>>>>>> 2. It needs to write events into multiple channels. It should apply
>>>>>> interceptor only when putting into one channel and should put
>>>>>> the event as it is while putting into another channel.
>>>>>>
>>>>>> Possible approach we have thought:
>>>>>>
>>>>>> 1. Create  2 different sources and then apply interceptor on one
and
>>>>>> dont
>>>>>> apply on other. But that duplicates reads and increases IO.
>>>>>>
>>>>>> Is there any better way of achieving this use case?
>>>>>>
>>>>>> Regards,
>>>>>> Jagadish
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message