flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mangtani, Kushal" <Kushal.Mangt...@viasat.com>
Subject Flume Issues
Date Thu, 12 Jun 2014 07:48:22 GMT
Hi,

I have noticed a couple of issues in my flume setup. Below I have described my setup,issues,
flume.conf

Setup:-

I'm using Flume-Ng 1.4 cdh4.4 Tarball for collecting aggregated logs.
I am running a 2 tier(agent,collector) Flume Configuration with custom plugins. There are
approximately 20 agents (receiving data) and 6 collector flume (writing to HDFS) machines
all running independently.

Issues :-

The agent seems to be running fine.However; I notice a couple of issues in the collector side
(the collector flume.conf is included in the end of email):

  *   Issue 1 :- Assume flume is writing a file in a data node.Now; if that data node crashes
for some reason. Flume does not recovers from this situation. Ideally;  flume should skip
writing to that file and continue its processing.However; we see it tries to reach that file.If
it can't; it keeps on trying infinitely and stops doing any other processing. Either I'm not
doing something right OR there is a bug in HDFS Sink

STDOUT of the logs:


10 Jun 2014 20:23:40,878 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.append:477)
 - Caught IOException writing to HDFSWriter (70000 millis timeout while waiting for channel
to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980
remote=/10.64.6.134:50010]). Closing file (hdfs://namenode/data/2014/06/10/1900/stream/c-6-record1.1402429221442.tmp)
and rethrowing exception.

10 Jun 2014 20:23:40,904 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.append:483)
 - Caught IOException while closing file (hdfs://namenode/data/2014/06/10/1900/rumnonhttp/c-6-rum24-nonhttprecord.1402429221442.tmp).
Exception follows.

java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready
for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980 remote=/10.64.6.134:50010]

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)

10 Jun 2014 20:23:40,904 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:438)
 - HDFS IO error

java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready
for read. ch : java.nio.channels.SocketChannel[connected local=/10.64.4.22:58980 remote=/10.64.6.134:50010]

        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:165)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:156)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:129)

        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:117)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at java.io.FilterInputStream.read(FilterInputStream.java:83)

        at org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed(HdfsProtoUtil.java:169)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:954)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:922)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1023)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:821)

        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463)

10 Jun 2014 20:23:41,617 INFO  [New I/O server boss #1 ([id: 0x5201a55f, /0.0.0.0:53000])]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)  - [id: 0x00b4bdbe,
/10.75.201.32:42877 => /10.64.4.22:53000] OPEN

10 Jun 2014 20:23:41,617 INFO  [New I/O  worker #8] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)
 - [id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] BOUND: /10.64.4.22:53000

10 Jun 2014 20:23:41,617 INFO  [New I/O  worker #8] (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171)
 - [id: 0x00b4bdbe, /10.75.201.32:42877 => /10.64.4.22:53000] CONNECTED: /10.75.201.32:42877

10 Jun 2014 20:23:43,686 DEBUG [New I/O  worker #8] (com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:359)
 - Avro source AvroSource: Received avro event batch of 10000 events.

10 Jun 2014 20:23:44,646 ERROR [New I/O  worker #7] (com.viasat.flume.sources.RUMFilterAvroSource.appendBatch:401)
 - Avro source AvroSource: Unable to process event batch. Exception follows.

org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name:
NonHttpHdfsChannel}

        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

        at com.viasat.flume.sources.RUMFilterAvroSource.appendBatch(RUMFilterAvroSource.java:398)
.....


  *   Issue 2 :- I am using a File Channel in my collector.conf .  I noticed the replay of
logs takes a lot of time; around 24 hrs to replay 12 GB of data. I am using Amazon EBS IOPS
drive for file channel storage and also dual checkpoints in file channel conf. On parsing
the flume.logs; I noticed that there is a Bad Checkpoint Exception.

So; putting all the pieces together; Flume founded a bad checkpoint and it tried to replay
all logs worth 12 GB. What makes the time to replay logs (12 GB) at around 24 hours ?


P.S :  Each event/record is avg 2KB .




Flume Collector Configuration  :-

# Name the components on this agent
agent.sources = r1
agent.channels = c1 c2 c3 c4 c5
agent.sinks = k1 k2 k3 k4 k5


# Describe/configure the source r1
agent.sources.r1.type = CustomSource
agent.sources.r1.channels = c1 c2 c3 c4 c5
agent.sources.r1.bind = 0.0.0.0
agent.sources.r1.port = 53000
agent.sources.r1.schemaFolder = /usr/lib/flume-ng/schema
agent.sources.r1.selector.type = multiplexing
agent.sources.r1.selector.header = rectype
agent.sources.r1.selector.mapping.Record-1 = c1
agent.sources.r1.selector.mapping.Record-2 = c2
agent.sources.r1.selector.mapping.Record-3 = c3
agent.sources.r1.selector.mapping.Record-4 = c4
agent.sources.r1.selector.mapping.Record-5 = c5

# c1 channel config
agent.channels.c1.type = file
agent.channels.c1.useDualCheckpoints = true
agent.channels.c1.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-1-channel/checkpoint
agent.channels.c1.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-1-channel/backUpCheckpoint
agent.channels.c1.dataDirs = /usr/lib/flume-ng/datastore/collector/record-1-channel/logs
agent.channels.c1.capacity = 30000
agent.channels.c1.transactionCapacity = 3000
agent.channels.c1.write-timeout = 30
agent.channels.c1.keep-alive = 30


#c2 channel config
agent.channels.c2.type = file
agent.channels.c2.useDualCheckpoints = true
agent.channels.c2.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-2-channel/checkpoint
agent.channels.c2.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-2-channel/backUpCheckpoint
agent.channels.c2.dataDirs = /usr/lib/flume-ng/datastore/collector/record-2-channel/logs
agent.channels.c2.capacity = 30000
agent.channels.c2.transactionCapacity = 3000
agent.channels.c2.write-timeout = 30
agent.channels.c2.keep-alive = 30


# c3 channel config
agent.channels.c3.type = file
agent.channels.c3.useDualCheckpoints = true
agent.channels.c3.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-3-channel/checkpoint
agent.channels.c3.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-3-channel/backUpCheckpoint
agent.channels.c3.dataDirs = /usr/lib/flume-ng/datastore/collector/record-3-channel/logs
agent.channels.c3.capacity = 30000
agent.channels.c3.transactionCapacity = 3000
agent.channels.c3.write-timeout = 30
agent.channels.c3.keep-alive = 30


#c4 channel config
agent.channels.c4.type = file
agent.channels.c4.useDualCheckpoints = true
agent.channels.c4.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-4-channel/checkpoint
agent.channels.c4.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-4-channel/backUpCheckpointt
agent.channels.c4.dataDirs = /usr/lib/flume-ng/datastore/collector/record-4-channel/logs
agent.channels.c4.capacity = 30000
agent.channels.c4.transactionCapacity = 3000
agent.channels.c4.write-timeout = 30
agent.channels.c4.keep-alive = 30


#c5 channel config
agent.channels.c5.type = file
agent.channels.c5.useDualCheckpoints = true
agent.channels.c5.checkpointDir = /usr/lib/flume-ng/datastore/collector/record-5-channel/checkpoint
agent.channels.c5.backUpCheckpointDir = /usr/lib/flume-ng/datastore/collector/record-5-channel/backUpCheckpointt
agent.channels.c5.dataDirs = /usr/lib/flume-ng/datastore/collector/record-5-channel/logs
agent.channels.c5.capacity = 30000
agent.channels.c5.transactionCapacity = 3000
agent.channels.c5.write-timeout = 30
agent.channels.c5.keep-alive = 30




#k1 sink config
agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.serializer = CustomSerializer$Builder
agent.sinks.k1.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k1.serializer.schemaVersion = 24
agent.sinks.k1.serializer.syncIntervalBytes = 4096000
#agent.sinks.k1.serializer = avro
agent.sinks.k1.serializer.compressionCodec = snappy
agent.sinks.k1.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-1
agent.sinks.k1.hdfs.filePrefix = rec-1
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollInterval = 1200
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.callTimeout = 30000
agent.sinks.k1.hdfs.batchSize = 1000

#k2 sink config
agent.sinks.k2.type = hdfs
agent.sinks.k2.channel = c2
agent.sinks.k2.hdfs.fileType = DataStream
agent.sinks.k2.serializer = CustomSerializer$Builder
agent.sinks.k2.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k2.serializer.schemaVersion = 24
agent.sinks.k2.serializer.syncIntervalBytes = 4096000
#agent.sinks.k2.serializer = avro
agent.sinks.k2.serializer.compressionCodec = snappy
agent.sinks.k2.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-2
agent.sinks.k2.hdfs.filePrefix = rec-2
agent.sinks.k2.hdfs.rollSize = 0
agent.sinks.k2.hdfs.rollInterval = 1200
agent.sinks.k2.hdfs.rollCount = 0
agent.sinks.k2.hdfs.callTimeout = 30000
agent.sinks.k2.hdfs.batchSize = 1000

#k3 sink config
agent.sinks.k3.type = hdfs
agent.sinks.k3.channel = c3
agent.sinks.k3.hdfs.fileType = DataStream
agent.sinks.k3.serializer = CustomSerializer$Builder
agent.sinks.k3.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k3.serializer.schemaVersion = 24
agent.sinks.k3.serializer.syncIntervalBytes = 4096000
#agent.sinks.k3.serializer = avro
agent.sinks.k3.serializer.compressionCodec = snappy
agent.sinks.k3.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-3
agent.sinks.k3.hdfs.filePrefix = rec-3
agent.sinks.k3.hdfs.rollSize = 0
agent.sinks.k3.hdfs.rollInterval = 1200
agent.sinks.k3.hdfs.rollCount = 0
agent.sinks.k3.hdfs.callTimeout = 30000
agent.sinks.k3.hdfs.batchSize = 1000

#k4 sink config
agent.sinks.k4.type = hdfs
agent.sinks.k4.channel = c4
agent.sinks.k4.hdfs.fileType = DataStream
agent.sinks.k4.serializer = CustomSerializer$Builder
agent.sinks.k4.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k4.serializer.schemaVersion = 24
agent.sinks.k4.serializer.syncIntervalBytes = 4096000
#agent.sinks.k4.serializer = avro
agent.sinks.k4.serializer.compressionCodec = snappy
agent.sinks.k4.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-4
agent.sinks.k4.hdfs.filePrefix = rec-4
agent.sinks.k4.hdfs.rollSize = 0
agent.sinks.k4.hdfs.rollInterval = 1200
agent.sinks.k4.hdfs.rollCount = 0
agent.sinks.k4.hdfs.callTimeout = 30000
agent.sinks.k4.hdfs.batchSize = 1000

#k5 sink config
agent.sinks.k5.type = hdfs
agent.sinks.k5.channel = c5
agent.sinks.k5.hdfs.fileType = DataStream
agent.sinks.k5.serializer = CustomSerializer$Builder
agent.sinks.k5.serializer.schemaFile = /usr/lib/flume-ng/schema/schema
agent.sinks.k5.serializer.schemaVersion = 24
agent.sinks.k5.serializer.syncIntervalBytes = 4096000
#agent.sinks.k5.serializer = avro
agent.sinks.k5.serializer.compressionCodec = snappy
agent.sinks.k5.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/record-5
agent.sinks.k5.hdfs.filePrefix = rec-5
agent.sinks.k5.hdfs.rollSize = 0
agent.sinks.k5.hdfs.rollInterval = 1200
agent.sinks.k5.hdfs.rollCount = 0
agent.sinks.k5.hdfs.callTimeout = 30000
agent.sinks.k5.hdfs.batchSize = 1000


Any Inputs/Suggestions ?


Thanks
-Kushal Mangtani












Mime
View raw message