flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ed Chiu <sendtoed...@gmail.com>
Subject tuning s3 sink throughput
Date Thu, 22 Aug 2013 19:21:14 GMT
Greetings!

I wanted to tune the throughput of my downstream s3 sink to send bigger
batches of data to s3. The goal is to have bigger files (over 10MB) rolled
to S3 without unnecessary delay. Any tuning recommendations for this end
goal?

I've experimented tunings by creating more sinks (8 sinks in total), with
bigger batchSize each, in order to transfer big batches to s3 without
delay. For my case, big batches (batchSize = 50000) is too slow to drain
the channel. (The channel reaches maximum capacity very fast).

Roll types being tuned:
- rollSize = 20000 -> 50000, draining backs up.
- roll by timer, rollInterval = 600. lower than 600, the files are too
small (different threads breaking the events for the same partition into
small chunks. < 600 the channel is not drained fast enough.

Overall flow:
- ~ 20 upstream agent nodes
- 20 MB/min -> spoolDir -> file channel -> avro sink (batchSize = 1000)

- Downstream aggregation node
- avro source -> file channel -> 8 s3 sinks (each with hdfs.batchSize =
500000).

=====
content from the same original host got broken into small chunks
=====
22 Aug 2013 11:34:01,292 INFO  [hdfs-s3_sink4-roll-timer-0]
(org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close:190)
 - OutputStream for key
'flume/bidder/dt=2013-08-22/hour=10/minute=40/host=fe12/201308221040.1377193517830.gz.tmp'
closed. Now beginning upload
22 Aug 2013 11:34:01,293 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.jets3t.service.impl.rest.httpclient.RestS3Service.putObjectImpl:1260)
 - Uploading object data with Content-Length: 237899
22 Aug 2013 11:34:01,344 INFO  [hdfs-s3_sink1-roll-timer-0]
(org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsOutputStream.close:190)
 - OutputStream for key
'flume/bidder/dt=2013-08-22/hour=10/minute=40/host=fe12/201308221040.1377193517827.gz.tmp'
closed. Now beginning upload
22 Aug 2013 11:34:01,339 DEBUG [hdfs-s3_sink3-roll-timer-0]
(org.jets3t.service.impl.rest.httpclient.RestS3Service.putObjectImpl:1260)
 - Uploading object data with Content-Length: 171489


=====
many tiny chunks despite the large batch size
=====
22 Aug 2013 09:47:28,999 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.jets3t.service.utils.ServiceUtils.cleanRestMetadataMap:264)  - Leaving
HTTP header item unchanged: Content-Length=1716
22 Aug 2013 09:47:29,551 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler.endElement:412)
 - Created new S3Object from listing: S3Object
22 Aug 2013 09:47:29,607 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.apache.commons.httpclient.Wire.wire:69)  - << "Content-Length:
1716[\r][\n]"
, Content-Length: 1716
22 Aug 2013 09:47:29,627 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.jets3t.service.utils.ServiceUtils.cleanRestMetadataMap:264)  - Leaving
HTTP header item unchanged: Content-Length=1716
22 Aug 2013 09:47:29,643 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.apache.commons.httpclient.Wire.wire:69)  - >> "Content-Length:
0[\r][\n]"
22 Aug 2013 09:47:29,724 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.apache.commons.httpclient.Wire.wire:69)  - << "Content-Length:
234[\r][\n]"
, Content-Length: 234
22 Aug 2013 09:47:29,772 DEBUG [hdfs-s3_sink4-roll-timer-0]
(org.jets3t.service.utils.ServiceUtils.cleanRestMetadataMap:264)  - Leaving
HTTP header item unchanged: Content-Length=234






====
Sample config
====
agent.sources = default_avro_source
agent.channels = s3_channel
agent.sinks = s3_sink1 s3_sink2 s3_sink3 s3_sink4 s3_sink5 s3_sink6
s3_sink7 s3_sink8


# default avro source
agent.sources.default_avro_source.type = avro
agent.sources.default_avro_source.bind = 0.0.0.0
agent.sources.default_avro_source.port = 9998
agent.sources.default_avro_source.selector.type = replicating

# s3 channel
agent.channels.s3_channel.type = file
agent.channels.s3_channel.checkpointDir = /flume/channels/s3/checkpoint
agent.channels.s3_channel.dataDirs = /flume/channels/s3/data
agent.channels.s3_channel.capacity = 200000000
agent.channels.s3_channel.transactionCapacity = 3000000
agent.channels.s3_channel.use-fast-replay = true


# s3 sink
agent.sinks.s3_sink1.type = hdfs
agent.sinks.s3_sink1.hdfs.codeC = gzip
agent.sinks.s3_sink1.hdfs.fileType = CompressedStream
#agent.sinks.s3_sink1.hdfs.fileType = DataStream
agent.sinks.s3_sink1.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink1.hdfs.filePrefix = %Y%m%d%H%M
#agent.sinks.s3_sink1.hdfs.fileSuffix = .log.gz
agent.sinks.s3_sink1.hdfs.callTimeout = 20000
agent.sinks.s3_sink1.hdfs.rollSize = 0
agent.sinks.s3_sink1.hdfs.rollCount = 0
agent.sinks.s3_sink1.hdfs.rollInterval = 300
agent.sinks.s3_sink1.hdfs.batchSize = 500000
#agent.sinks.s3_sink1.hdfs.round = true
#agent.sinks.s3_sink1.hdfs.roundUnit = minute
#agent.sinks.s3_sink1.hdfs.roundValue = 30
#agent.sinks.s3_sink1.hdfs.rollTimerPoolSize = 5
#agent.sinks.s3_sink1.hdfs.threadsPoolSize = 20

agent.sinks.s3_sink2.type = hdfs
agent.sinks.s3_sink2.hdfs.codeC = gzip
agent.sinks.s3_sink2.hdfs.fileType = CompressedStream
agent.sinks.s3_sink2.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink2.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink2.hdfs.callTimeout = 20000
agent.sinks.s3_sink2.hdfs.rollSize = 0
agent.sinks.s3_sink2.hdfs.rollCount = 0
agent.sinks.s3_sink2.hdfs.rollInterval = 300
agent.sinks.s3_sink2.hdfs.batchSize = 500000
#agent.sinks.s3_sink2.hdfs.round = true
#agent.sinks.s3_sink2.hdfs.roundUnit = minute
#agent.sinks.s3_sink2.hdfs.roundValue = 15
#agent.sinks.s3_sink2.hdfs.rollTimerPoolSize = 5
#agent.sinks.s3_sink2.hdfs.threadsPoolSize = 20

agent.sinks.s3_sink3.type = hdfs
agent.sinks.s3_sink3.hdfs.codeC = gzip
agent.sinks.s3_sink3.hdfs.fileType = CompressedStream
agent.sinks.s3_sink3.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink3.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink3.hdfs.callTimeout = 20000
agent.sinks.s3_sink3.hdfs.rollSize = 0
agent.sinks.s3_sink3.hdfs.rollCount = 0
agent.sinks.s3_sink3.hdfs.rollInterval = 300
agent.sinks.s3_sink3.hdfs.batchSize = 500000
#agent.sinks.s3_sink3.hdfs.round = true
##agent.sinks.s3_sink3.hdfs.roundUnit = minute
##agent.sinks.s3_sink3.hdfs.roundValue = 15
#agent.sinks.s3_sink3.hdfs.rollTimerPoolSize = 5
#agent.sinks.s3_sink3.hdfs.threadsPoolSize = 20

agent.sinks.s3_sink4.type = hdfs
agent.sinks.s3_sink4.hdfs.codeC = gzip
agent.sinks.s3_sink4.hdfs.fileType = CompressedStream
agent.sinks.s3_sink4.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink4.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink4.hdfs.callTimeout = 20000
agent.sinks.s3_sink4.hdfs.rollSize = 0
agent.sinks.s3_sink4.hdfs.rollCount = 0
agent.sinks.s3_sink4.hdfs.rollInterval = 300
agent.sinks.s3_sink4.hdfs.batchSize = 500000
#agent.sinks.s3_sink4.hdfs.round = true
#agent.sinks.s3_sink4.hdfs.roundUnit = minute
#agent.sinks.s3_sink4.hdfs.roundValue = 15
#agent.sinks.s3_sink4.hdfs.rollTimerPoolSize = 5
#agent.sinks.s3_sink4.hdfs.threadsPoolSize = 20

agent.sinks.s3_sink5.type = hdfs
agent.sinks.s3_sink5.hdfs.codeC = gzip
agent.sinks.s3_sink5.hdfs.fileType = CompressedStream
agent.sinks.s3_sink5.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink5.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink5.hdfs.callTimeout = 20000
agent.sinks.s3_sink5.hdfs.rollSize = 0
agent.sinks.s3_sink5.hdfs.rollCount = 0
agent.sinks.s3_sink5.hdfs.rollInterval = 300
agent.sinks.s3_sink5.hdfs.batchSize = 500000
#agent.sinks.s3_sink5.hdfs.round = true
#agent.sinks.s3_sink5.hdfs.roundUnit = minute
#agent.sinks.s3_sink5.hdfs.roundValue = 15
#agent.sinks.s3_sink5.hdfs.rollTimerPoolSize = 20
#agent.sinks.s3_sink5.hdfs.threadsPoolSize = 100

agent.sinks.s3_sink6.type = hdfs
agent.sinks.s3_sink6.hdfs.codeC = gzip
agent.sinks.s3_sink6.hdfs.fileType = CompressedStream
agent.sinks.s3_sink6.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink6.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink6.hdfs.callTimeout = 20000
agent.sinks.s3_sink6.hdfs.rollSize = 0
agent.sinks.s3_sink6.hdfs.rollCount = 0
agent.sinks.s3_sink6.hdfs.rollInterval = 300
agent.sinks.s3_sink6.hdfs.batchSize = 500000
#agent.sinks.s3_sink6.hdfs.round = true
##agent.sinks.s3_sink6.hdfs.roundUnit = minute
##agent.sinks.s3_sink6.hdfs.roundValue = 15
##agent.sinks.s3_sink6.hdfs.rollTimerPoolSize = 20
##agent.sinks.s3_sink6.hdfs.threadsPoolSize = 100

agent.sinks.s3_sink7.type = hdfs
agent.sinks.s3_sink7.hdfs.codeC = gzip
agent.sinks.s3_sink7.hdfs.fileType = CompressedStream
agent.sinks.s3_sink7.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink7.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink7.hdfs.callTimeout = 20000
agent.sinks.s3_sink7.hdfs.rollSize = 0
agent.sinks.s3_sink7.hdfs.rollCount = 0
agent.sinks.s3_sink7.hdfs.rollInterval = 300
agent.sinks.s3_sink7.hdfs.batchSize = 500000
#agent.sinks.s3_sink7.hdfs.round = true
#agent.sinks.s3_sink7.hdfs.roundUnit = minute
#agent.sinks.s3_sink7.hdfs.roundValue = 15
#agent.sinks.s3_sink7.hdfs.rollTimerPoolSize = 20
#agent.sinks.s3_sink7.hdfs.threadsPoolSize = 100

agent.sinks.s3_sink8.type = hdfs
agent.sinks.s3_sink8.hdfs.codeC = gzip
agent.sinks.s3_sink8.hdfs.fileType = CompressedStream
agent.sinks.s3_sink8.hdfs.path = {s3_path partition by host}
agent.sinks.s3_sink8.hdfs.filePrefix = %Y%m%d%H%M
agent.sinks.s3_sink8.hdfs.callTimeout = 20000
agent.sinks.s3_sink8.hdfs.rollSize = 0
agent.sinks.s3_sink8.hdfs.rollCount = 0
agent.sinks.s3_sink8.hdfs.rollInterval = 300
agent.sinks.s3_sink8.hdfs.batchSize = 500000
#agent.sinks.s3_sink8.hdfs.round = true
#agent.sinks.s3_sink8.hdfs.roundUnit = minute
#agent.sinks.s3_sink8.hdfs.roundValue = 15
#agent.sinks.s3_sink8.hdfs.rollTimerPoolSize = 20
#agent.sinks.s3_sink8.hdfs.threadsPoolSize = 100


# Binding
agent.sources.default_avro_source.channels = s3_channel
agent.sinks.s3_sink1.channel = s3_channel
agent.sinks.s3_sink2.channel = s3_channel
agent.sinks.s3_sink3.channel = s3_channel
agent.sinks.s3_sink4.channel = s3_channel
agent.sinks.s3_sink5.channel = s3_channel
agent.sinks.s3_sink6.channel = s3_channel
agent.sinks.s3_sink7.channel = s3_channel
agent.sinks.s3_sink8.channel = s3_channel

Mime
View raw message