flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Durgapal <durgapalmo...@gmail.com>
Subject issue with failover sinks in flume
Date Tue, 16 Sep 2014 12:55:03 GMT
We have a two stage topology in flume in which we are in the first tier
adding headers based on hash value of a field in the event.
The hashing logic is added in the interceptor in Tier 1 of flume topology
which basically sets a header field. And then we use multiplexing to direct
events to Tier 2  based on that header field through selector.
 In the second tier we are storing the events locally using file_roll and
storing the same events in hdfs also.

Everything works fine when we are not using the failover sinks. When we add
the failover sink configuration in the first tier our hashing logic gets
overriden. That means even when all the machines in our Tier 2  are active
and running, some events which were meant for flume agent1(based on hashing
& multiplexing) go to agent 2.

Also we are performing this test on three machines. One machine for Tier 1(
lets say machine A) and two machines(lets say machine B & C) for Tier 2. In
Tier 2 for flume agent on machine B, the machine C acts as the failover
backup and for flume agent on machine C, the machine B acts as the failover
backup.

Any idea what could be wrong with this configuration?

Below are the tier wise configurations:

*Tier 1:*

agent1tier1.sources = tcpsrc
agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
avro-forward-ch02 avro-forward-ch02backup

agent1tier1.channels = channelbucket01 channelbucket02
agent1tier1.channels.channelbucket01.type = file
agent1tier1.channels.channelbucket02.type = file


agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
agent1tier1.sources.tcpsrc.type = syslogtcp
agent1tier1.sources.tcpsrc.port = 5149
agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
agent1tier1.sources.tcpsrc.interceptors=i1
agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp




#################### INTERCEPTOR ##############################
agent1tier1.sources.tcpsrc.interceptors=logsintercept
agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
bucket01,bucket02



#################### END OF INTERCEPTOR ##############################



####################### SELECTOR ###########################

agent1tier1.sources.tcpsrc.selector.type=multiplexing
agent1tier1.sources.tcpsrc.selector.header = bucket
agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
agent1tier1.sources.tcpsrc.selector.default = channelbucket01

##################### END OF SELECTOR #############################



#################### CHANNELS ##############################

agent1tier1.channels.channelbucket01.checkpointDir =
/home/flume/channelbucket01/file-channel/checkpoint
agent1tier1.channels.channelbucket01.dataDirs =
/home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data

agent1tier1.channels.channelbucket02.checkpointDir =
/home/flume/channelbucket02/file-channel/checkpoint
agent1tier1.channels.channelbucket02.dataDirs =
/home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data


#################### CHANNELS ##############################






################## CHANNELS CAPACITY ############################


agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
agent1tier1.channels.channelbucket01.checkpointInterval = 30000
agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
agent1tier1.channels.channelbucket01.capacity = 10000000

agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
agent1tier1.channels.channelbucket02.checkpointInterval = 30000
agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
agent1tier1.channels.channelbucket02.capacity = 10000000


################## END OF CHANNELS CAPACITY ############################



 # avro sink properties
agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01.type = avro
agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch01.port = 10000

 # avro sink properties
agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
agent1tier1.sinks.avro-forward-ch01backup.type = avro
agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch01backup.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02.type = avro
agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
agent1tier1.sinks.avro-forward-ch02.port = 19999

 # avro sink properties
agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
agent1tier1.sinks.avro-forward-ch02backup.type = avro
agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
agent1tier1.sinks.avro-forward-ch02backup.port = 10000



agent1tier1.sinkgroups = grpch1
agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
avro-forward-ch01backup
agent1tier1.sinkgroups.grpch1.processor.type = failover
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
#agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup =
10
agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000



agent1tier1.sinkgroups = grpch2
agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
avro-forward-ch02backup
agent1tier1.sinkgroups.grpch2.processor.type = failover
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
#agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup =
11
agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000



*Tier 2:*

tier2.sources  = avro-AppSrv-source
tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt
channelhdfsdel
tier2.channels.channelimp.type = file
tier2.channels.channelconv.type = file
tier2.channels.channelclk.type = file
tier2.channels.channelrt.type = file
tier2.channels.channelhdfsrt.type = file
tier2.channels.channelhdfsdel.type = file

# For each source, channel, and sink, set
# standard properties.
# properties of avro-AppSrv-source
tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
channelclk channelrt channelhdfsrt channelhdfsdel
tier2.sources.avro-AppSrv-source.type = avro
tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
tier2.sources.avro-AppSrv-source.port = 10000






tier2.sources.avro-AppSrv-source.selector.type=multiplexing
tier2.sources.avro-AppSrv-source.selector.header = rectype
tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
channelhdfsdel
tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
channelhdfsdel
tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
channelhdfsdel

tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
channelhdfsrt


tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel



tier2.sinks.impsink.type = file_roll
tier2.sinks.impsink.channel = channelimp
tier2.sinks.impsink.sink.directory = /var/log/flume/imp
tier2.sinks.impsink.sink.rollInterval=60

tier2.sinks.convsink.type = file_roll
tier2.sinks.convsink.channel = channelconv
tier2.sinks.convsink.sink.directory = /var/log/flume/conv
tier2.sinks.convsink.sink.rollInterval=60

tier2.sinks.clksink.type = file_roll
tier2.sinks.clksink.channel = channelclk
tier2.sinks.clksink.sink.directory = /var/log/flume/clk
tier2.sinks.clksink.sink.rollInterval=60


tier2.sinks.rtsink.type = file_roll
tier2.sinks.rtsink.channel = channelrt
tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
tier2.sinks.rtsink.sink.rollInterval=60


#################### CHANNELS ##############################

tier2.channels.channelimp.checkpointDir =
/home/flume/channelimp/file-channel/checkpoint
tier2.channels.channelimp.dataDirs =
/home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data


tier2.channels.channelclk.checkpointDir =
/home/flume/channelclk/file-channel/checkpoint
tier2.channels.channelclk.dataDirs =
/home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data

tier2.channels.channelconv.checkpointDir =
/home/flume/channelconv/file-channel/checkpoint
tier2.channels.channelconv.dataDirs =
/home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data

tier2.channels.channelrt.checkpointDir =
/home/flume/channelrt/file-channel/checkpoint
tier2.channels.channelrt.dataDirs =
/home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data

tier2.channels.channelhdfsrt.checkpointDir =
/home/flume/channelhdfsrt/file-channel/checkpoint
tier2.channels.channelhdfsrt.dataDirs =
/home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data

tier2.channels.channelhdfsdel.checkpointDir =
/home/flume/channelhdfsdel/file-channel/checkpoint
tier2.channels.channelhdfsdel.dataDirs =
/home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data




#################### CHANNELS ##############################


tier2.sinks.hdfssinkrt.type = hdfs
tier2.sinks.hdfssinkrt.channel = channelhdfsrt
tier2.sinks.hdfssinkrt.hdfs.path =
hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
# Roll based on the block size only
tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
# seconds to wait before closing the file.
#tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
#tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000


tier2.sinks.hdfssinkdel.type = hdfs
tier2.sinks.hdfssinkdel.channel = channelhdfsdel
tier2.sinks.hdfssinkdel.hdfs.path =
hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
# Roll based on the block size only
tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
# seconds to wait before closing the file.
#tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
#tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
#################### END OF SINKS ##############################

Mime
View raw message