flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Johny Rufus <jru...@cloudera.com>
Subject Re: Urgent help: Flume Uncaught Exception
Date Wed, 22 Jul 2015 15:35:24 GMT
A couple of things.
1) You should not delete the file while the file is still being read from.
This could lead to the IlegalStateException
2) As you pointed out, you could end up with this issue when 0 byte files
are placed in the spool directory and if the fix for FLUME-1934 is not
there.

Thanks,
Rufus

On Wed, Jul 22, 2015 at 8:25 AM, Nikhil Gs <gsnikhil1432010@gmail.com>
wrote:

> Deleting the file after placed in the spooling directory.
>
> Is this because of my flume version i.e. Flume 1.5.0-cdh5.4.1
> Because, I have noticed the same issue that has been resolved in Flume 1.6.
> https://issues.apache.org/jira/browse/FLUME-1934
>
> Thanks,
> Flume User.
>
>
> On Wed, Jul 22, 2015 at 10:16 AM, Johny Rufus <jrufus@cloudera.com> wrote:
>
>> Are you renaming or deleting the file that has been placed in the
>> spooling directory ?
>>
>> Thanks,
>> Rufus
>>
>> On Wed, Jul 22, 2015 at 6:41 AM, Nikhil Gs <gsnikhil1432010@gmail.com>
>> wrote:
>>
>>> Hello Everyone,
>>>
>>> Facing a problem with flume spool.
>>> Below is my configuration,
>>>
>>> # Please paste flume.conf here. Example:
>>>
>>> # Sources, channels, and sinks are defined per
>>> # agent name, in this case 'pnm'.
>>> pnm.sources  = SPOOL
>>> pnm.channels = MemChannel
>>> pnm.sinks    = AVRO
>>>
>>> # For each source, channel, and sink, set
>>> # standard properties.
>>> pnm.sources.SPOOL.type          = spooldir
>>> pnm.sources.SPOOL.spoolDir      =
>>> /home/s_sdldalplhdxxxedh/pnm-poll-results
>>> pnm.sources.SPOOL.channels      = MemChannel MemChannel2
>>> pnm.sources.SPOOL.fileHeader    = true
>>> pnm.sources.SPOOL.deletePolicy  = immediate
>>> pnm.sources.SPOOL.consumeOrder  = oldest
>>> pnm.sources.SPOOL.batchSize     = 1
>>>
>>> pnm.sources.SPOOL.interceptors = time
>>> pnm.sources.SPOOL.interceptors.time.type =
>>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>> pnm.sources.SPOOL.deserializer  =
>>> com.suddenlink.flume.WholeFileDeserializer$Builder
>>>
>>> pnm.sinks.AVRO.type         = avro
>>> pnm.sinks.AVRO.channel      = MemChannel
>>> pnm.sinks.AVRO.hostname = sdldalplhdw01.suddenlink.cequel3.com
>>> pnm.sinks.AVRO.port     = 40001
>>> pnm.sinks.AVRO.batchSize = 1
>>> pnm.sinks.AVRO.connect-timeout = 40000
>>>
>>>
>>> # pnm.sinks.HDFS.type         = hdfs
>>> # pnm.sinks.HDFS.channel      = MemChannel2
>>> # pnm.sinks.HDFS.hdfs.path = /user/flume/poll/%Y/%m/%d/%H/
>>> # pnm.sinks.HDFS.hdfs.fileType = DataStream
>>> # pnm.sinks.HDFS.hdfs.writeFormat = Text
>>> # pnm.sinks.HDFS.hdfs.batchSize = 100
>>> # pnm.sinks.HDFS.hdfs.rollSize = 0
>>> # pnm.sinks.HDFS.hdfs.rollCount = 1000
>>> # pnm.sinks.HDFS.hdfs.rollInterval = 600
>>>
>>> # Other properties are specific to each type of
>>> # source, channel, or sink. In this case, we
>>> # specify the capacity of the memory channel.
>>>
>>> pnm.channels.MemChannel.capacity = 1000000
>>> pnm.channels.MemChannel.type   = memory
>>>
>>> # pnm.channels.MemChannel2.capacity = 10000
>>> # pnm.channels.MemChannel2.type   = memory
>>>
>>>
>>> *Error Log file*
>>>
>>> org.apache.flume.source.SpoolDirectorySource
>>> FATAL: Spool Directory source SPOOL: { spoolDir:
>>> /home/s_sdldalplhdxxxedh/pnm-poll-results }: Uncaught exception in
>>> SpoolDirectorySource thread. Restart or reconfigure Flume to continue
>>> processing.
>>> java.lang.IllegalStateException: File should not roll when commit is
>>> outstanding.
>>> at
>>> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:235)
>>> at
>>> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have tried to restart the agent, but the same error is appearing.
>>>
>>> Regards,
>>> Nikhil
>>>
>>
>>
>

Mime
View raw message