Return-Path: X-Original-To: apmail-flume-user-archive@www.apache.org Delivered-To: apmail-flume-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB0D1E2CD for ; Fri, 18 Jan 2013 09:13:57 +0000 (UTC) Received: (qmail 93333 invoked by uid 500); 18 Jan 2013 09:13:57 -0000 Delivered-To: apmail-flume-user-archive@flume.apache.org Received: (qmail 92911 invoked by uid 500); 18 Jan 2013 09:13:55 -0000 Mailing-List: contact user-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flume.apache.org Delivered-To: mailing list user@flume.apache.org Received: (qmail 92881 invoked by uid 99); 18 Jan 2013 09:13:54 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jan 2013 09:13:54 +0000 X-ASF-Spam-Status: No, hits=2.5 required=5.0 tests=FREEMAIL_REPLY,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of cwoodson.dev@gmail.com designates 209.85.212.174 as permitted sender) Received: from [209.85.212.174] (HELO mail-wi0-f174.google.com) (209.85.212.174) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Jan 2013 09:13:49 +0000 Received: by mail-wi0-f174.google.com with SMTP id hq4so5098166wib.13 for ; Fri, 18 Jan 2013 01:13:28 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:x-received:in-reply-to:references:date:message-id :subject:from:to:content-type; bh=cffPp/qozVFCdLjgX9aDVcIMYpBu+aZV5tUr/uxYK5s=; b=An0ewVWqvaDoh54UJNxHNyH2Q3ghk2uA5Wc0JCLeVBwGI2UIX6PHqpyVh4vPvicZQO iAcHoh0y/K50ZC95Bv8Pa/BTpqk6d5WJzyPCSTqUhwKA2wdhIWx0AAm2yAPKCTowTvBq 7mhkC3pe9vcW9RshTXn45+oLMIUQru5PdZ0dIeBR224J0pG9fq9cXhTZIFY+ZRTMTyYo nw6IOzgVQM663gkF1tpk5t6F3n2+5YI/huiFRbP6UgpKF6mTgAgcBnLeL4vlHXYApRoX 4UbcwOqKiWoS6c0r8TPnE74k/leeQfokrIQB750ONzUgbH/26pkClqY4Pz0n3gJXRKFj pbVg== MIME-Version: 1.0 X-Received: by 10.194.122.98 with SMTP id lr2mr12749163wjb.55.1358500408181; Fri, 18 Jan 2013 01:13:28 -0800 (PST) Received: by 10.227.2.196 with HTTP; Fri, 18 Jan 2013 01:13:27 -0800 (PST) In-Reply-To: References: Date: Fri, 18 Jan 2013 01:13:27 -0800 Message-ID: Subject: Re: Uncaught Exception When Using Spooling Directory Source From: Connor Woodson To: "user@flume.apache.org" Content-Type: multipart/alternative; boundary=089e0122896c41c1e804d38c86ea X-Virus-Checked: Checked by ClamAV on apache.org --089e0122896c41c1e804d38c86ea Content-Type: text/plain; charset=GB2312 Content-Transfer-Encoding: quoted-printable The Spooling Directory Source is best used for sending old data / backups through Flume, as opposed to trying to use it for realtime data (due to, as you discovered, you aren't supposed to write directly to a file in that directory but rather place closed files in there). You could implement what Mike mentioned above about rolling the logs into the spooling directory, but there are other options. If you are looking to pull data in real time, the Exec Sourcementioned above does work. The one downside with this is that this source is not the most reliable, as is mentioned in the red box in that link, and you will have to monitor it to make sure it hasn't crashed. However, other than the Spooling Directory source and any custom source you write, this is the only other pulling source. But depending on how your system is set up, you could set up a system for pushing your logs into Flume. Here are some options: If the log files you want to capture use Log4J, then there is a Log4JAppend= er which will send events directly to Flume. The benefit to this is that you let Flume take control of the events right as they are generated; they are sent through Avro to your specified host/ip where you will have a Flume agent with an Av= ro Source running. Another alternative to the above if you don't use Log4J but you do have direct control over the application is to use the Embedded Flume Agent. This is even more powerful than the log4j appender as you have more control over how it works and you are able to use the Flume channels with it. This would end up pushing events via Avro to your Flume agent to then collect/process/store. There are a variety of network methods that can communicate with Flume. Flume has support for listening on a specified port with the Netcat Source, getting events via HTTP Postmessages, and if your application uses Syslog that's supported as well. In summation, if you need to set up a pulling system you will need to place a Flume agent on each of your servers and have it use a Spooling Directory or Exec source; or if your system is configurable enough you will be able to modify it in various possible ways to push the logs to Flume. I hope some of that was helpful, - Connor On Fri, Jan 18, 2013 at 12:18 AM, Henry Ma wrote: > We have an advertisement system, which owns hundreds of servers running > service such as resin/nginx, and each of them generates log files to a > local location every seconds. What we need is to collect all the log file= s > in time to a central storage such as MooseFS for real-time analysis, and > then archive them to HDFS by hour. > > We want to deploy Flume to collect log files as soon as they are generate= d > from nearly one hundred servers (the server list may be added or removed = at > any time) to a central location, and then archive to HDFS each hour. > > By now the log files cannot be pushed to any collecting system. We want t= o > the collecting system can PULL all of them remotely. > > Can you give me some guide? Thanks! > > > On Fri, Jan 18, 2013 at 3:45 PM, Mike Percy wrote: > >> Can you provide more detail about what kinds of services? >> >> If you roll the logs every 5 minutes or so then you can configure the >> spooling source to pick them up once they are rolled by either rolling t= hem >> into a directory for immutable files or using the trunk version of the >> spooling file source to specify a filter to ignore files that don't matc= h a >> "rolled" pattern. >> >> You could also use exec source with "tail -F" but that is much more >> unreliable than the spooling file source. >> >> Regards, >> Mike >> >> >> On Thu, Jan 17, 2013 at 10:23 PM, Henry Ma wrot= e: >> >>> OK, thank you very much, now I know why the problem occurs. >>> >>> I am a new comer of Flume. Here is my scenario: using Flume to >>> collecting from hundreds of directories from dozens of servers to a cen= tral >>> storage. It seems that spooling directory source may not be the best >>> choice. Can someone give me some advice about how to design the >>> architecture? Which type of source and sink can fit? >>> >>> Thanks! >>> >>> >>> On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy wrote: >>> >>>> Hi Henry, >>>> The files must be immutable before putting them into the spooling >>>> directory. So if you copy them from a different file system then you c= an >>>> run into this issue. The right way to do it is to copy them to the sam= e >>>> file system and then atomically move them into the spooling directory. >>>> >>>> Regards, >>>> Mike >>>> >>>> >>>> On Thu, Jan 17, 2013 at 9:59 PM, Henry Ma wro= te: >>>> >>>>> Thank you very much! I clean all the related dir and restart again. I >>>>> keep the source spooling dir empty, then start Flume, and then put so= me >>>>> file into the spooling dir. But this time a new error occured: >>>>> >>>>> 13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing to move >>>>> file >>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-201301181128= 00.hs016.ssp >>>>> to /disk2/mahy/FLUME_TEST/ >>>>> source/sspstat.log.20130118112700-20130118112800.hs016.ssp.COMPLETED >>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught >>>>> exception in Runnable >>>>> java.lang.IllegalStateException: File has changed size since being >>>>> read: >>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-201301181128= 00.hs016.ssp >>>>> at >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile= (SpoolingFileLineReader.java:241) >>>>> at >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(Spoolin= gFileLineReader.java:185) >>>>> at >>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.r= un(SpoolDirectorySource.java:135) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:44= 1) >>>>> at >>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java= :317) >>>>> at >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= access$101(ScheduledThreadPoolExecutor.java:98) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= run(ScheduledThreadPoolExecutor.java:204) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec= utor.java:886) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor= .java:908) >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> 13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught >>>>> exception in Runnable >>>>> java.io.IOException: Stream closed >>>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>> at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>>> at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>>> at >>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(Spoolin= gFileLineReader.java:180) >>>>> at >>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.r= un(SpoolDirectorySource.java:135) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:44= 1) >>>>> at >>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java= :317) >>>>> at >>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= access$101(ScheduledThreadPoolExecutor.java:98) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>> at >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.= run(ScheduledThreadPoolExecutor.java:204) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec= utor.java:886) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor= .java:908) >>>>> at java.lang.Thread.run(Thread.java:662) >>>>> 13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught >>>>> exception in Runnable >>>>> java.io.IOException: Stream closed >>>>> at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>> >>>>> >>>>> I think it is a typical scenario: Flume is watching some dirs and >>>>> collecting new arriving files. I don't know why the exception " File = has >>>>> changed size since being read" was throwed and how to avoid it. Can y= ou >>>>> give some advice and guide? Thanks! >>>>> >>>>> >>>>> On Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell = wrote: >>>>> >>>>>> Hey Henry, >>>>>> >>>>>> The Spooling source assumes that each file is uniquely named. If it >>>>>> sees that new file name has arrived that it already processed (and h= as >>>>>> rolled over to a COMPLETED file), it throws an error and shuts down. >>>>>> This is to try and prevent sending duplicate data downstream. >>>>>> >>>>>> Probably the best idea is to clear out the COMPLETED file (and the >>>>>> original file, if they are indeed the same one) and restart. >>>>>> >>>>>> - Patrick >>>>>> >>>>>> On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland >>>>>> wrote: >>>>>> > Hmm, I think this is probaly the root cause. Looks like their was = a >>>>>> > file with that name already used. >>>>>> > >>>>>> > 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught >>>>>> > exception in Runnable >>>>>> > java.lang.IllegalStateException: File name has been re-used with >>>>>> > different files. Spooling assumption violated for >>>>>> > >>>>>> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-20130118100= 100.hs009.ssp.COMPLETED >>>>>> > at >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFil= e(SpoolingFileLineReader.java:272) >>>>>> > at >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(Spooli= ngFileLineReader.java:185) >>>>>> > at >>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.= run(SpoolDirectorySource.java:135) >>>>>> > at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:4= 41) >>>>>> > at >>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.jav= a:317) >>>>>> > at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>> > at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .access$101(ScheduledThreadPoolExecutor.java:98) >>>>>> > at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>> > at >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .run(ScheduledThreadPoolExecutor.java:204) >>>>>> > at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExe= cutor.java:886) >>>>>> > at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecuto= r.java:908) >>>>>> > at java.lang.Thread.run(Thread.java:662) >>>>>> > >>>>>> > On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma >>>>>> wrote: >>>>>> >> attached is the log file. >>>>>> >> >>>>>> >> the content of conf file: >>>>>> >> # Name the components on this agent >>>>>> >> a1.sources =3D r1 >>>>>> >> a1.sinks =3D k1 >>>>>> >> a1.channels =3D c1 >>>>>> >> >>>>>> >> # Describe/configure the source >>>>>> >> a1.sources.r1.type =3D spooldir >>>>>> >> a1.sources.r1.spoolDir =3D /disk2/mahy/FLUME_TEST/source >>>>>> >> a1.sources.r1.channels =3D c1 >>>>>> >> >>>>>> >> # Describe the sink >>>>>> >> a1.sinks.k1.type =3D file_roll >>>>>> >> a1.sinks.k1.sink.directory =3D /disk2/mahy/FLUME_TEST/sink >>>>>> >> a1.sinks.k1.sink.rollInterval =3D 0 >>>>>> >> >>>>>> >> # Use a channel which buffers events in memory >>>>>> >> a1.channels.c1.type =3D memory >>>>>> >> a1.channels.c1.capacity =3D 99999 >>>>>> >> #a1.channels.c1. =3D /disk2/mahy/FLUME_TEST/check >>>>>> >> #a1.channels.c1.dataDirs =3D /disk2/mahy/FLUME_TEST/channel-data >>>>>> >> >>>>>> >> # Bind the source and sink to the channel >>>>>> >> a1.sources.r1.channels =3D c1 >>>>>> >> a1.sinks.k1.channel =3D c1 >>>>>> >> >>>>>> >> >>>>>> >> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland >>>>>> wrote: >>>>>> >>> >>>>>> >>> Hi, >>>>>> >>> >>>>>> >>> Would you mind turning logging to debug and then posting your fu= ll >>>>>> >>> log/config? >>>>>> >>> >>>>>> >>> Brock >>>>>> >>> >>>>>> >>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma < >>>>>> henry.ma.1986@gmail.com> wrote: >>>>>> >>> > Hi, >>>>>> >>> > >>>>>> >>> > When using Spooling Directory Source in Flume NG 1.3.1, this >>>>>> exception >>>>>> >>> > happens: >>>>>> >>> > >>>>>> >>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource: Uncaught >>>>>> exception >>>>>> >>> > in >>>>>> >>> > Runnable >>>>>> >>> > java.io.IOException: Stream closed >>>>>> >>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:97) >>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:292) >>>>>> >>> > at java.io.BufferedReader.readLine(BufferedReader.java:362) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> org.apache.flume.client.avro.SpoolingFileLineReader.readLines(Spooli= ngFileLineReader.java:180) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.= run(SpoolDirectorySource.java:135) >>>>>> >>> > at >>>>>> >>> > >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:4= 41) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.jav= a:317) >>>>>> >>> > at >>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .access$101(ScheduledThreadPoolExecutor.java:98) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .runPeriodic(ScheduledThreadPoolExecutor.java:180) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask= .run(ScheduledThreadPoolExecutor.java:204) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExe= cutor.java:886) >>>>>> >>> > at >>>>>> >>> > >>>>>> >>> > >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecuto= r.java:908) >>>>>> >>> > at java.lang.Thread.run(Thread.java:662) >>>>>> >>> > >>>>>> >>> > It usually happened when dropping some new files into the >>>>>> spooling dir, >>>>>> >>> > and >>>>>> >>> > stop collecting file. Does someone know the reason and how to >>>>>> avoid it? >>>>>> >>> > >>>>>> >>> > Thanks very much! >>>>>> >>> > -- >>>>>> >>> > Best Regards, >>>>>> >>> > Henry Ma >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> -- >>>>>> >>> Apache MRUnit - Unit testing MapReduce - >>>>>> >>> http://incubator.apache.org/mrunit/ >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> -- >>>>>> >> Best Regards, >>>>>> >> Henry Ma >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Apache MRUnit - Unit testing MapReduce - >>>>>> http://incubator.apache.org/mrunit/ >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Henry Ma >>>>> >>>> >>>> >>> >>> >>> -- >>> Henry Ma >>> >> >> > > > -- > Best Regards, > =C2=ED=BB=B7=D3=EE > =CD=F8=D2=D7=D3=D0=B5=C0 EAD-Platform > POPO: mahy@corp.netease.com > MSN: henry.ma.1986@gmail.com > MOBILE: 18600601996 > --089e0122896c41c1e804d38c86ea Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
The Spooling Directory Source is best used for = sending old data / backups through Flume, as opposed to trying to use it fo= r realtime data (due to, as you discovered, you aren't supposed to writ= e directly to a file in that directory =C2=A0but rather place closed files = in there). You could implement what Mike mentioned above about rolling the = logs into the spooling directory, but there are other options.

If you are looking to pull data in real time, the Exec Source m= entioned above does work. The one downside with this is that this source is= not the most reliable, as is mentioned in the red box in that link, and yo= u will have to monitor it to make sure it hasn't crashed. However, othe= r than the Spooling Directory source and any custom source you write, this = is the only other pulling source.

But depending on how your system is set up, you could set up= a system for pushing your logs into Flume. Here are some options:
=
If the log files you want to capture use Log4J, then t= here is a Log4JAppender which will send events directly to Flume. The benef= it to this is that you let Flume take control of the events right as they a= re generated; they are sent through Avro to your specified host/ip where yo= u will have a Flume agent with an Avro Source running.

Another alternative to the above if you don= 't use Log4J but you do have direct control over the application is to = use the Embedded Flume Agent. Th= is is even more powerful than the log4j appender as you have more control o= ver how it works and you are able to use the Flume channels with it. This w= ould end up pushing events via Avro to your Flume agent to then collect/pro= cess/store.

There are a variety of network method= s that can communicate with Flume. Flume has support for listening on a spe= cified port with the Netcat Source, getting events via HTTP Post messages, and = if your application uses Syslog that's supported as well.

In summation, if you need to set up a pulli= ng system you will need to place a Flume agent on each of your servers and = have it use a Spooling Directory or Exec source; or if your system is confi= gurable enough you will be able to modify it in various possible ways to pu= sh the logs to Flume.

I hope some of that was helpful,

- Connor
<= br>
On Fri, Jan 18, 2013 at 12:18 AM, Henry M= a <henry.ma.1986@gmail.com> wrote:
We have an advertisement sy= stem, which owns hundreds of servers running service such as resin/nginx, a= nd each of them generates log files to a local location every seconds. What= we need is to collect all the log files in time to a central storage such = as MooseFS for real-time analysis, and then archive them to HDFS by hour.
We want to deploy Flume to collect log files as soon as they= are generated from nearly one hundred servers (the server list may be adde= d or removed at any time) to a central location, and then archive to HDFS e= ach hour.

By now the log files cannot be pushed to any collecting= system. We want to the collecting system can PULL all of them remotely.

Can you give me some guide? Thanks!


On Fri, Jan 18, 2013 at 3:45 PM, Mike Pe= rcy <mpercy@apache.org> wrote:
Can you provide more detail= about what kinds of services?

If you roll the logs ever= y 5 minutes or so then you can configure the spooling source to pick them u= p once they are rolled by either rolling them into a directory for immutabl= e files or using the trunk version of the spooling file source to specify a= filter to ignore files that don't match a "rolled" pattern.<= /div>

You could also use exec source with "tail -F"= but that is much more unreliable than the spooling file source.
=
Regards,
Mike


On Thu, Jan 17, 2013 at 10:23 PM, Henry = Ma <henry.ma.1986@gmail.com> wrote:
OK, thank you very much, now I know why the problem occurs= .

I am a new comer of Flume. Here is my scenario: using = Flume to collecting from hundreds of directories from dozens of servers to = a central storage. It seems that spooling directory source may not be the b= est choice. Can someone give me some advice about how to design the archite= cture? Which type of source and sink can fit?

Thanks!

<= br>
On Fri, Jan 18, 2013 at 2:05 PM, Mike Percy <= span dir=3D"ltr"><mpercy@apache.org> wrote:
Hi Henry,
The files mus= t be immutable before putting them into the spooling directory. So if you c= opy them from a different file system then you can run into this issue. The= right way to do it is to copy them to the same file system and then atomic= ally move them into the spooling directory.

Regards,
Mike


On Thu, Jan 17, 2013 at= 9:59 PM, Henry Ma <henry.ma.1986@gmail.com> wrote:
Thank you very much! I clea= n all the=C2=A0related dir and restart again. I keep the source spooling di= r empty, then start Flume, and then put some file into the spooling dir. Bu= t this time a new error occured:

13/01/18 13:44:24 INFO avro.SpoolingFileLineReader: Preparing= to move file /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112700-2013= 0118112800.hs016.ssp to /disk2/mahy/FLUME_TEST/
source/sspstat.lo= g.20130118112700-20130118112800.hs016.ssp.COMPLETED
13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught exceptio= n in Runnable
java.lang.IllegalStateException: File has changed s= ize since being read: /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118112= 700-20130118112800.hs016.ssp
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.client.avro.SpoolingFi= leLineReader.retireCurrentFile(SpoolingFileLineReader.java:241)
<= div>=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.client.avro.SpoolingFil= eLineReader.readLines(SpoolingFileLineReader.java:185)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.source.SpoolDirectoryS= ource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executors$RunnableAdapt= er.call(Executors.java:441)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.FutureTask$Sync.innerRu= nAndReset(FutureTask.java:317)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at jav= a.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecuto= r$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledThreadPoo= lExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:= 180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Schedule= dThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.jav= a:204)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor= $Worker.runTask(ThreadPoolExecutor.java:886)
=C2=A0 =C2=A0 =C2=A0= =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe= cutor.java:908)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.r= un(Thread.java:662)
13/01/18 13:44:24 ERROR source.SpoolDirectorySource: Uncaught ex= ception in Runnable
java.io.IOException: Stream closed
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedReader.ensureOpen(Buffe= redReader.java:97)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedReader.readLine(BufferedRead= er.java:292)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedReade= r.readLine(BufferedReader.java:362)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 a= t org.apache.flume.client.avro.SpoolingFileLineReader.readLines(SpoolingFil= eLineReader.java:180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flume.source.SpoolDirectoryS= ource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:135)
= =C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executors$RunnableAdapt= er.call(Executors.java:441)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.FutureTask$Sync.innerRu= nAndReset(FutureTask.java:317)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at jav= a.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecuto= r$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ScheduledThreadPoo= lExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:= 180)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Schedule= dThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.jav= a:204)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor= $Worker.runTask(ThreadPoolExecutor.java:886)
=C2=A0 =C2=A0 =C2=A0= =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExe= cutor.java:908)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.r= un(Thread.java:662)
13/01/18 13:44:25 ERROR source.SpoolDirectorySource: Uncaught ex= ception in Runnable
java.io.IOException: Stream closed
=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.io.BufferedReader.ensureOpen(Buffe= redReader.java:97)


I think it is a typical scenario: = Flume is watching some dirs and collecting new arriving files. I don't = know why the exception "=C2=A0File has changed size since being read&q= uot; was throwed and how to avoid it. Can you give some advice and guide? T= hanks!


On = Fri, Jan 18, 2013 at 1:48 PM, Patrick Wendell <pwendell@gmail.com>= wrote:
Hey Henry,

The Spooling source assumes that each file is uniquely named. If it
sees that new file name has arrived that it already processed (and has
rolled over to a COMPLETED file), it throws an error and shuts down.
This is to try and prevent sending duplicate data downstream.

Probably the best idea is to clear out the COMPLETED file (and the
original file, if they are indeed the same one) and restart.

- Patrick

On Thu, Jan 17, 2013 at 9:31 PM, Brock Noland <brock@cloudera.com> wrote:
> Hmm, I think this is probaly the root cause. Looks like their was a > file with that name already used.
>
> 13/01/18 13:16:59 ERROR source.SpoolDirectorySource: Uncaught
> exception in Runnable
> java.lang.IllegalStateException: File name has been re-used with
> different files. Spooling assumption violated for
> /disk2/mahy/FLUME_TEST/source/sspstat.log.20130118100000-2013011810010= 0.hs009.ssp.COMPLETED
> =C2=A0 at org.apache.flume.client.avro.SpoolingFileLineReader.retireCu= rrentFile(SpoolingFileLineReader.java:272)
> =C2=A0 at org.apache.flume.client.avro.SpoolingFileLineReader.readLine= s(SpoolingFileLineReader.java:185)
> =C2=A0 at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryR= unnable.run(SpoolDirectorySource.java:135)
> =C2=A0 at java.util.concurrent.Executors$RunnableAdapter.call(Executor= s.java:441)
> =C2=A0 at java.util.concurrent.FutureTask$Sync.innerRunAndReset(Future= Task.java:317)
> =C2=A0 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:= 150)
> =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu= tureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu= tureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> =C2=A0 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu= tureTask.run(ScheduledThreadPoolExecutor.java:204)
> =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Threa= dPoolExecutor.java:886)
> =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo= lExecutor.java:908)
> =C2=A0 at java.lang.Thread.run(Thread.java:662)
>
> On Thu, Jan 17, 2013 at 9:22 PM, Henry Ma <henry.ma.1986@gmail.com> wrote:=
>> attached is the log file.
>>
>> the content of conf file:
>> # Name the components on this agent
>> a1.sources =3D r1
>> a1.sinks =3D k1
>> a1.channels =3D c1
>>
>> # Describe/configure the source
>> a1.sources.r1.type =3D spooldir
>> a1.sources.r1.spoolDir =3D /disk2/mahy/FLUME_TEST/source
>> a1.sources.r1.channels =3D c1
>>
>> # Describe the sink
>> a1.sinks.k1.type =3D file_roll
>> a1.sinks.k1.sink.directory =3D /disk2/mahy/FLUME_TEST/sink
>> a1.sinks.k1.sink.rollInterval =3D 0
>>
>> # Use a channel which buffers events in memory
>> a1.channels.c1.type =3D memory
>> a1.channels.c1.capacity =3D 99999
>> #a1.channels.c1. =3D /disk2/mahy/FLUME_TEST/check
>> #a1.channels.c1.dataDirs =3D /disk2/mahy/FLUME_TEST/channel-data >>
>> # Bind the source and sink to the channel
>> a1.sources.r1.channels =3D c1
>> a1.sinks.k1.channel =3D c1
>>
>>
>> On Fri, Jan 18, 2013 at 12:39 PM, Brock Noland <brock@cloudera.com> wrote:<= br> >>>
>>> Hi,
>>>
>>> Would you mind turning logging to debug and then posting your = full
>>> log/config?
>>>
>>> Brock
>>>
>>> On Thu, Jan 17, 2013 at 8:24 PM, Henry Ma <henry.ma.1986@gmail.com>= ; wrote:
>>> > Hi,
>>> >
>>> > When using Spooling Directory Source in Flume NG 1.3.1, t= his exception
>>> > happens:
>>> >
>>> > 13/01/18 11:37:09 ERROR source.SpoolDirectorySource: Unca= ught exception
>>> > in
>>> > Runnable
>>> > java.io.IOException: Stream closed
>>> > at java.io.BufferedReader.ensureOpen(BufferedReader.java:= 97)
>>> > at java.io.BufferedReader.readLine(BufferedReader.java:29= 2)
>>> > at java.io.BufferedReader.readLine(BufferedReader.java:36= 2)
>>> > at
>>> >
>>> > org.apache.flume.client.avro.SpoolingFileLineReader.readL= ines(SpoolingFileLineReader.java:180)
>>> > at
>>> >
>>> > org.apache.flume.source.SpoolDirectorySource$SpoolDirecto= ryRunnable.run(SpoolDirectorySource.java:135)
>>> > at
>>> > java.util.concurrent.Executors$RunnableAdapter.call(Execu= tors.java:441)
>>> > at
>>> >
>>> > java.util.concurrent.FutureTask$Sync.innerRunAndReset(Fut= ureTask.java:317)
>>> > at java.util.concurrent.FutureTask.runAndReset(FutureTask= .java:150)
>>> > at
>>> >
>>> > java.util.concurrent.ScheduledThreadPoolExecutor$Schedule= dFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
>>> > at
>>> >
>>> > java.util.concurrent.ScheduledThreadPoolExecutor$Schedule= dFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
>>> > at
>>> >
>>> > java.util.concurrent.ScheduledThreadPoolExecutor$Schedule= dFutureTask.run(ScheduledThreadPoolExecutor.java:204)
>>> > at
>>> >
>>> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Th= readPoolExecutor.java:886)
>>> > at
>>> >
>>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(Thread= PoolExecutor.java:908)
>>> > at java.lang.Thread.run(Thread.java:662)
>>> >
>>> > It usually happened when dropping some new files into the= spooling dir,
>>> > and
>>> > stop collecting file. Does someone know the reason and ho= w to avoid it?
>>> >
>>> > Thanks very much!
>>> > --
>>> > Best Regards,
>>> > Henry Ma
>>>
>>>
>>>
>>> --
>>> Apache MRUnit - Unit testing MapReduce -
>>> http://incubator.apache.org/mrunit/
>>
>>
>>
>>
>> --
>> Best Regards,
>> Henry Ma
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/



<= /div>--
Henry Ma




<= /div>--
Henry Ma




<= /div>--
Best Regards,
=E9=A9=AC=E7=8E= =AF=E5=AE=87
=E7=BD=91=E6=98=93=E6= =9C=89=E9=81=93 EAD-Platform
M= SN:=C2=A0=C2=A0=C2=A0=C2=A0henry.ma.1986@gmail.com
MOBILE: 186006019= 96

--089e0122896c41c1e804d38c86ea--