flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Snehal Nagmote <nagmote.sne...@gmail.com>
Subject Re: Spooling Directory Source Stuck in Exception [Serializer has been closed]
Date Thu, 31 Oct 2013 18:19:14 GMT
Thank you Brock for your reply, We changed our code to do similar as you
suggested and It worked without having any issues for more than 17 hrs

Changed log rotate to copy to file to the extension which would be ignored
by Flume using excludePattern and then renamed it .

It seems the latency between moving files may be causing issue
https://issues.apache.org/jira/browse/FLUME-2066 .


Thanks,
Snehal


On 30 October 2013 11:07, Brock Noland <brock@cloudera.com> wrote:

> Under certain circumstances mv will actually be a copy + delete. In that
> case the file size will change during the copy phase. I'd recommend copying
> it in with an extension which is ignored via excludePattern and then
> renaming it.
>
>
> On Wed, Oct 30, 2013 at 12:49 PM, Snehal Nagmote <nagmote.snehal@gmail.com
> > wrote:
>
>> Christopher,
>>
>> I am not able to fix issue, in my case it works for 1-2 hrs before it
>> fails, I have similar set up like you have. Only difference is log rotate
>> moves files into spooling directory source and I am not using delete policy
>> immediate  "a1.sources.r1.deletePolicy = immediate"
>>
>> It fails because of this exception,
>>
>> java.lang.IllegalStateException: File has changed size since being read:
>> /var/log/nginx/get/1383088201.log
>>  at
>> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:286)
>>
>> But log rotate does not change any file. I am not sure why File's size
>> may be changed . Please find log rotate script with this email
>>
>>
>> /var/log/nginx/access.log {
>>  notifempty
>> olddir /var/log/nginx
>> rotate 3
>> postrotate
>>  timestamp=$(date +%s)
>> /bin/mv /var/log/nginx/access.log.1 /var/log/nginx/get/$timestamp.log
>>         chown -R etlstage:users /var/log/nginx/get/
>>         [ ! -f /var/run/nginx.pid ] || kill -USR1 `cat /var/run/nginx.pid`
>>     endscript
>> }
>>
>> /var/log/nginx/post-access.log {
>>  notifempty
>> olddir /var/log/nginx
>> rotate 3
>>  postrotate
>> timestamp=$(date +%s)
>> /bin/mv /var/log/nginx/post-access.log.1
>> /var/log/nginx/post/$timestamp.log
>>  cd /var/log/nginx/post
>> mv `ls -t | awk 'NR>13'` /var/log/nginx/post_archive/
>>         chown -R test:users /var/log/nginx/post/
>>         chown -R test:users /var/log/nginx/post_archive/
>>         [ ! -f /var/run/nginx_post.pid ] || kill -USR1 `cat
>> /var/run/nginx_post.pid`
>>     endscript
>> }
>>
>> *My spooling directory source is /var/log/nginx/get  and /
>> var/log/nginx/post*
>>
>>
>> Any pointers would be helpful.
>>
>> -Snehal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On 30 October 2013 06:47, Christopher Surage <csurage@gmail.com> wrote:
>>
>>> I am having the same problem except that I am only copying files one at
>>> a time and this occurs after I copy the first file. My configs are as
>>> follows.
>>>
>>> 1) First box
>>>
>>> # Name the compnents of the agent
>>> a1.sources = r1
>>> a1.sinks = k1
>>> a1.channels = c1
>>>
>>>
>>> ###############Describe/configure the source#################
>>> a1.sources.r1.type = spooldir
>>> a1.sources.r1.spoolDir = /home/desktop/flume_test/
>>> a1.sources.r1.deletePolicy = immediate
>>> a1.sources.r1.trackerDir = /home/desktop/.flume/
>>> a1.sources.r1.deserializer.maxLineLength = 10000
>>>
>>> # how many events to transfer to the channel at a time
>>> a1.sources.r1.batchSize = 1000
>>> a1.sources.r1.channels = c1
>>>
>>> ##############describe the sink#######################
>>> # Avro sink
>>> a1.sinks.k1.type = avro
>>> a1.sinks.k1.hostname = box2
>>> a1.sinks.k1.port = 9313
>>>
>>> # How many events to take from the channel at a time
>>> a1.sinks.k1.batch-size = 1000
>>>
>>> # Channel the sink connects to
>>> a1.sinks.k1.channel = c1
>>>
>>> ################describe the channel##################
>>> # use a channel which buffers events in memory
>>> a1.channels.c1.type = memory
>>>
>>> # How many events the channel holds
>>> a1.channels.c1.capacity = 10000
>>>
>>> # Max number of events to give to sink or take from source
>>> a1.channels.c1.transactionCapacity = 1000
>>> a1.channels.c1.byteCapacity = 0
>>>
>>>
>>> 2) Second Box
>>>
>>> # Name the compnents of the agent
>>> a1.sources = r1
>>> a1.sinks = k1
>>> a1.channels = c1
>>>
>>>
>>> ###############Describe/configure the source#################
>>> a1.sources.r1.type = avro
>>> a1.sources.r1.bind = box2
>>> a1.sources.r1.port = 9313
>>> a1.sources.r1.channels = c1
>>>
>>>
>>> ##############describe the sink#######################
>>> # HDFS sink
>>> a1.sinks.k1.type = hdfs
>>> a1.sinks.k1.hdfs.path = /user/hive
>>> a1.sinks.k1.hdfs.fileType = DataStream
>>> a1.sinks.k1.hdfs.rollSize = 0
>>> a1.sinks.k1.hdfs.rollCount = 5000000
>>> a1.sinks.k1.hdfs.rollInterval = 0
>>> a1.sinks.k1.hdfs.idleTimeout = 30
>>> a1.sinks.k1.hdfs.batchSize = 1000
>>>
>>> # Channel the sink connects to
>>> a1.sinks.k1.channel = c1
>>>
>>>
>>> ################describe the channel##################
>>> # use a channel which buffers events in memory
>>> a1.channels.c1.type = memory
>>>
>>> # How many events the channel can hold
>>> a1.channels.c1.capacity = 10000
>>>
>>> # Max number of events the channel can take from source
>>> # or give to a sink
>>> a1.channels.c1.transactionCapacity = 1000
>>> a1.channels.c1.byteCapacity = 0
>>>
>>>
>>> any help would be appreciated.
>>>
>>> Chris
>>>
>>>
>>>
>>>
>>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
>

Mime
View raw message