flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henry Ma <henry.ma.1...@gmail.com>
Subject Re: Uncaught Exception When Using Spooling Directory Source
Date Fri, 18 Jan 2013 09:32:19 GMT
Thank you very much, Connor!! It is really HELPFUL.


On Fri, Jan 18, 2013 at 5:13 PM, Connor Woodson <cwoodson.dev@gmail.com>wrote:

> The Spooling Directory Source is best used for sending old data / backups
> through Flume, as opposed to trying to use it for realtime data (due to, as
> you discovered, you aren't supposed to write directly to a file in that
> directory  but rather place closed files in there). You could implement
> what Mike mentioned above about rolling the logs into the spooling
> directory, but there are other options.
>
> If you are looking to pull data in real time, the Exec Source<http://flume.apache.org/FlumeUserGuide.html#exec-source>mentioned
above does work. The one downside with this is that this source
> is not the most reliable, as is mentioned in the red box in that link, and
> you will have to monitor it to make sure it hasn't crashed. However, other
> than the Spooling Directory source and any custom source you write, this is
> the only other pulling source.
>
> But depending on how your system is set up, you could set up a system for
> pushing your logs into Flume. Here are some options:
>
> If the log files you want to capture use Log4J, then there is a Log4JAppender
> <http://flume.apache.org/FlumeUserGuide.html#log4j-appender>which will
> send events directly to Flume. The benefit to this is that you let Flume
> take control of the events right as they are generated; they are sent
> through Avro to your specified host/ip where you will have a Flume agent
> with an Avro Source<http://flume.apache.org/FlumeUserGuide.html#flume-sources>running.
>
> Another alternative to the above if you don't use Log4J but you do have
> direct control over the application is to use the Embedded Flume Agent<https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst#embedded-agent>.
> This is even more powerful than the log4j appender as you have more control
> over how it works and you are able to use the Flume channels with it. This
> would end up pushing events via Avro to your Flume agent to then
> collect/process/store.
>
> There are a variety of network methods that can communicate with Flume.
> Flume has support for listening on a specified port with the Netcat Source<http://flume.apache.org/FlumeUserGuide.html#netcat-source>,
> getting events via HTTP Post<http://flume.apache.org/FlumeUserGuide.html#http-source>messages,
and if your application uses Syslog that's
> supported <http://flume.apache.org/FlumeUserGuide.html#syslog-sources> as
> well.
>
> In summation, if you need to set up a pulling system you will need to
> place a Flume agent on each of your servers and have it use a Spooling
> Directory or Exec source; or if your system is configurable enough you will
> be able to modify it in various possible ways to push the logs to Flume.
>
> I hope some of that was helpful,
>
> - Connor
>
>
> On Fri, Jan 18, 2013 at 12:18 AM, Henry Ma <henry.ma.1986@gmail.com>wrote:
>
>> We have an advertisement system, which owns hundreds of servers running
>> service such as resin/nginx, and each of them generates log files to a
>> local location every seconds. What we need is to collect all the log files
>> in time to a central storage such as MooseFS for real-time analysis, and
>> then archive them to HDFS by hour.
>>
>> We want to deploy Flume to collect log files as soon as they are
>> generated from nearly one hundred servers (the server list may be added or
>> removed at any time) to a central location, and then archive to HDFS each
>> hour.
>>
>> By now the log files cannot be pushed to any collecting system. We want
>> to the collecting system can PULL all of them remotely.
>>
>> Can you give me some guide? Thanks!
>>
>>
>> On Fri, Jan 18, 2013 at 3:45 PM, Mike Percy <mpercy@apache.org> wrote:
>>
>>> Can you provide more detail about what kinds of services?
>>>
>>> If you roll the logs every 5 minutes or so then you can configure the
>>> spooling source to pick them up once they are rolled by either rolling them
>>> into a directory for immutable files or using the trunk version of the
>>> spooling file source to specify a filter to ignore files that don't match a
>>> "rolled" pattern.
>>>
>>> You could also use exec source with "tail -F" but that is much more
>>> unreliable than the spooling file source.
>>>
>>> Regards,
>>> Mike
>>>
>>>
>>> On Thu, Jan 17, 2013 at 10:23 PM, Henry Ma <henry.ma.1986@gmail.com>wrote:
>>>
>>>> OK, thank you very much, now I know why the problem occurs.
>>>>
>>>> I am a new comer of Flume. Here is my scenario: using Flume to
>>>> collecting from hundreds of directories from dozens of servers to a central
>>>> storage. It seems that spooling directory source may not be the best
>>>> choice. Can someone give me some advice about how to design the
>>>> architecture? Which type of source and sink can fit?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy <mpercy@apache.org> wrote:
>>>>
>>>>> Hi Henry,
>>>>> The files must be immutable before putting them into the spooling
>>>>> directory. So if you copy them from a different file system then you
can
>>>>> run into this issue. The right way to do it is to copy them to the same
>>>>> file system and then atomically move them into the spooling directory.
>>>>>
>>>>> Regards,
>>>>> Mike
>>>>>
>>>>>
>>>>> On Thu, Jan 17, 2013 at 9:59 PM, Henry Ma <henry.ma.1986@gmail.com>wrote:
>>>>>
>>>>>> Thank you very much! I clean all the related dir and restart again.
I
>>>>>> keep the source spooling dir empty, then start Flume, and then put
some
>>>>>> file into the spooling dir. But this time a new error occured:
>>>>>>
>>>>>> 13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing to
move
>>>>>> file
>>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp
>>>>>> to /disk2/mahy/FLUME_TEST/
>>>>>> source/sspstat.log.20130118112700-20130118112800.hs016.ssp.COMPLETED
>>>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught
>>>>>> exception in Runnable
>>>>>> java.lang.IllegalStateException: File has changed size since being
>>>>>> read:
>>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-20130118112800.hs016.ssp
>>>>>>         at
>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:241)
>>>>>>         at
>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
>>>>>>         at
>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>         at
>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>>>>>         at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>         at java.lang.Thread.run(Thread.java:662)
>>>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught
>>>>>> exception in Runnable
>>>>>> java.io.IOException: Stream closed
>>>>>>         at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
>>>>>>          at java.io.BufferedReader.readLine(BufferedReader.java:292)
>>>>>>         at java.io.BufferedReader.readLine(BufferedReader.java:362)
>>>>>>         at
>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
>>>>>>         at
>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>>>>>         at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>         at
>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>>>>>         at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>>>>>>         at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>         at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>         at java.lang.Thread.run(Thread.java:662)
>>>>>> 13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught
>>>>>> exception in Runnable
>>>>>> java.io.IOException: Stream closed
>>>>>>         at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
>>>>>>
>>>>>>
>>>>>> I think it is a typical scenario: Flume is watching some dirs and
>>>>>> collecting new arriving files. I don't know why the exception " File
has
>>>>>> changed size since being read" was throwed and how to avoid it. Can
you
>>>>>> give some advice and guide? Thanks!
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell <pwendell@gmail.com>wrote:
>>>>>>
>>>>>>> Hey Henry,
>>>>>>>
>>>>>>> The Spooling source assumes that each file is uniquely named.
If it
>>>>>>> sees that new file name has arrived that it already processed
(and
>>>>>>> has
>>>>>>> rolled over to a COMPLETED file), it throws an error and shuts
down.
>>>>>>> This is to try and prevent sending duplicate data downstream.
>>>>>>>
>>>>>>> Probably the best idea is to clear out the COMPLETED file (and
the
>>>>>>> original file, if they are indeed the same one) and restart.
>>>>>>>
>>>>>>> - Patrick
>>>>>>>
>>>>>>> On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland <brock@cloudera.com>
>>>>>>> wrote:
>>>>>>> > Hmm, I think this is probaly the root cause. Looks like
their was a
>>>>>>> > file with that name already used.
>>>>>>> >
>>>>>>> > 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught
>>>>>>> > exception in Runnable
>>>>>>> > java.lang.IllegalStateException: File name has been re-used
with
>>>>>>> > different files. Spooling assumption violated for
>>>>>>> >
>>>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-20130118100100.hs009.ssp.COMPLETED
>>>>>>> >   at
>>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:272)
>>>>>>> >   at
>>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:185)
>>>>>>> >   at
>>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>>>>>> >   at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>> >   at
>>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>>>>>> >   at
>>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>>>>> >   at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>>>>>> >   at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>>>>>>> >   at
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>>>>>>> >   at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>> >   at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>> >   at java.lang.Thread.run(Thread.java:662)
>>>>>>> >
>>>>>>> > On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma <henry.ma.1986@gmail.com>
>>>>>>> wrote:
>>>>>>> >> attached is the log file.
>>>>>>> >>
>>>>>>> >> the content of conf file:
>>>>>>> >> # Name the components on this agent
>>>>>>> >> a1.sources = r1
>>>>>>> >> a1.sinks = k1
>>>>>>> >> a1.channels = c1
>>>>>>> >>
>>>>>>> >> # Describe/configure the source
>>>>>>> >> a1.sources.r1.type = spooldir
>>>>>>> >> a1.sources.r1.spoolDir = /disk2/mahy/FLUME_TEST/source
>>>>>>> >> a1.sources.r1.channels = c1
>>>>>>> >>
>>>>>>> >> # Describe the sink
>>>>>>> >> a1.sinks.k1.type = file_roll
>>>>>>> >> a1.sinks.k1.sink.directory = /disk2/mahy/FLUME_TEST/sink
>>>>>>> >> a1.sinks.k1.sink.rollInterval = 0
>>>>>>> >>
>>>>>>> >> # Use a channel which buffers events in memory
>>>>>>> >> a1.channels.c1.type = memory
>>>>>>> >> a1.channels.c1.capacity = 99999
>>>>>>> >> #a1.channels.c1. = /disk2/mahy/FLUME_TEST/check
>>>>>>> >> #a1.channels.c1.dataDirs = /disk2/mahy/FLUME_TEST/channel-data
>>>>>>> >>
>>>>>>> >> # Bind the source and sink to the channel
>>>>>>> >> a1.sources.r1.channels = c1
>>>>>>> >> a1.sinks.k1.channel = c1
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland <
>>>>>>> brock@cloudera.com> wrote:
>>>>>>> >>>
>>>>>>> >>> Hi,
>>>>>>> >>>
>>>>>>> >>> Would you mind turning logging to debug and then
posting your
>>>>>>> full
>>>>>>> >>> log/config?
>>>>>>> >>>
>>>>>>> >>> Brock
>>>>>>> >>>
>>>>>>> >>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma <
>>>>>>> henry.ma.1986@gmail.com> wrote:
>>>>>>> >>> > Hi,
>>>>>>> >>> >
>>>>>>> >>> > When using Spooling Directory Source in Flume
NG 1.3.1, this
>>>>>>> exception
>>>>>>> >>> > happens:
>>>>>>> >>> >
>>>>>>> >>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource:
Uncaught
>>>>>>> exception
>>>>>>> >>> > in
>>>>>>> >>> > Runnable
>>>>>>> >>> > java.io.IOException: Stream closed
>>>>>>> >>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:97)
>>>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:292)
>>>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:362)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFileLineReader.java:180)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>>>>>>> >>> > at
>>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>> >>> > at
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>> >>> > at java.lang.Thread.run(Thread.java:662)
>>>>>>> >>> >
>>>>>>> >>> > It usually happened when dropping some new
files into the
>>>>>>> spooling dir,
>>>>>>> >>> > and
>>>>>>> >>> > stop collecting file. Does someone know the
reason and how to
>>>>>>> avoid it?
>>>>>>> >>> >
>>>>>>> >>> > Thanks very much!
>>>>>>> >>> > --
>>>>>>> >>> > Best Regards,
>>>>>>> >>> > Henry Ma
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> --
>>>>>>> >>> Apache MRUnit - Unit testing MapReduce -
>>>>>>> >>> http://incubator.apache.org/mrunit/
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> --
>>>>>>> >> Best Regards,
>>>>>>> >> Henry Ma
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > Apache MRUnit - Unit testing MapReduce -
>>>>>>> http://incubator.apache.org/mrunit/
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Henry Ma
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Henry Ma
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> 马环宇
>> 网易有道 EAD-Platform
>> POPO:   mahy@corp.netease.com
>> MSN:    henry.ma.1986@gmail.com
>> MOBILE: 18600601996
>>
>
>


-- 
Best Regards,
马环宇
网易有道 EAD-Platform
POPO:   mahy@corp.netease.com
MSN:    henry.ma.1986@gmail.com
MOBILE: 18600601996

Mime
View raw message