flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re: issue with failover sinks in flume
Date Tue, 16 Sep 2014 18:15:38 GMT
Looking at your sinks, it looks like backup of one channel pushes to the
primary of the other. Your sink group priorities are inverted. The higher
the value of the priority, earlier that sink will get picked up for
processing. So sink with priority 11 gets picked up before sink with
priority 1.



On Tue, Sep 16, 2014 at 5:55 AM, Mohit Durgapal <durgapalmohit@gmail.com>
wrote:

> We have a two stage topology in flume in which we are in the first tier
> adding headers based on hash value of a field in the event.
> The hashing logic is added in the interceptor in Tier 1 of flume topology
> which basically sets a header field. And then we use multiplexing to direct
> events to Tier 2  based on that header field through selector.
>  In the second tier we are storing the events locally using file_roll and
> storing the same events in hdfs also.
>
> Everything works fine when we are not using the failover sinks. When we
> add the failover sink configuration in the first tier our hashing logic
> gets overriden. That means even when all the machines in our Tier 2  are
> active and running, some events which were meant for flume agent1(based on
> hashing & multiplexing) go to agent 2.
>
> Also we are performing this test on three machines. One machine for Tier
> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2.
> In Tier 2 for flume agent on machine B, the machine C acts as the failover
> backup and for flume agent on machine C, the machine B acts as the failover
> backup.
>
> Any idea what could be wrong with this configuration?
>
> Below are the tier wise configurations:
>
> *Tier 1:*
>
> agent1tier1.sources = tcpsrc
> agent1tier1.sinks =  avro-forward-ch01 avro-forward-ch01backup
> avro-forward-ch02 avro-forward-ch02backup
>
> agent1tier1.channels = channelbucket01 channelbucket02
> agent1tier1.channels.channelbucket01.type = file
> agent1tier1.channels.channelbucket02.type = file
>
>
> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02
> agent1tier1.sources.tcpsrc.type = syslogtcp
> agent1tier1.sources.tcpsrc.port = 5149
> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104
> agent1tier1.sources.tcpsrc.interceptors=i1
> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp
>
>
>
>
> #################### INTERCEPTOR ##############################
> agent1tier1.sources.tcpsrc.interceptors=logsintercept
>
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder
> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets =
> bucket01,bucket02
>
>
>
> #################### END OF INTERCEPTOR ##############################
>
>
>
> ####################### SELECTOR ###########################
>
> agent1tier1.sources.tcpsrc.selector.type=multiplexing
> agent1tier1.sources.tcpsrc.selector.header = bucket
> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01
> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02
> agent1tier1.sources.tcpsrc.selector.default = channelbucket01
>
> ##################### END OF SELECTOR #############################
>
>
>
> #################### CHANNELS ##############################
>
> agent1tier1.channels.channelbucket01.checkpointDir =
> /home/flume/channelbucket01/file-channel/checkpoint
> agent1tier1.channels.channelbucket01.dataDirs =
> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data
>
> agent1tier1.channels.channelbucket02.checkpointDir =
> /home/flume/channelbucket02/file-channel/checkpoint
> agent1tier1.channels.channelbucket02.dataDirs =
> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data
>
>
> #################### CHANNELS ##############################
>
>
>
>
>
>
> ################## CHANNELS CAPACITY ############################
>
>
> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket01.checkpointInterval = 30000
> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket01.capacity = 10000000
>
> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000
> agent1tier1.channels.channelbucket02.checkpointInterval = 30000
> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071
> agent1tier1.channels.channelbucket02.capacity = 10000000
>
>
> ################## END OF CHANNELS CAPACITY ############################
>
>
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01.type = avro
> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch01.port = 10000
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01
> agent1tier1.sinks.avro-forward-ch01backup.type = avro
> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch01backup.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02.type = avro
> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29
> agent1tier1.sinks.avro-forward-ch02.port = 19999
>
>  # avro sink properties
> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02
> agent1tier1.sinks.avro-forward-ch02backup.type = avro
> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106
> agent1tier1.sinks.avro-forward-ch02backup.port = 10000
>
>
>
> agent1tier1.sinkgroups = grpch1
> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01
> avro-forward-ch01backup
> agent1tier1.sinkgroups.grpch1.processor.type = failover
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0
> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup
> = 10
> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000
>
>
>
> agent1tier1.sinkgroups = grpch2
> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02
> avro-forward-ch02backup
> agent1tier1.sinkgroups.grpch2.processor.type = failover
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1
> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup
> = 11
> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000
>
>
>
> *Tier 2:*
>
> tier2.sources  = avro-AppSrv-source
> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel
> tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt
> channelhdfsdel
> tier2.channels.channelimp.type = file
> tier2.channels.channelconv.type = file
> tier2.channels.channelclk.type = file
> tier2.channels.channelrt.type = file
> tier2.channels.channelhdfsrt.type = file
> tier2.channels.channelhdfsdel.type = file
>
> # For each source, channel, and sink, set
> # standard properties.
> # properties of avro-AppSrv-source
> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp
> channelclk channelrt channelhdfsrt channelhdfsdel
> tier2.sources.avro-AppSrv-source.type = avro
> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106
> tier2.sources.avro-AppSrv-source.port = 10000
>
>
>
>
>
>
> tier2.sources.avro-AppSrv-source.selector.type=multiplexing
> tier2.sources.avro-AppSrv-source.selector.header = rectype
> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk
> channelhdfsdel
> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv
> channelhdfsdel
>
> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt
> channelhdfsrt
>
>
> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel
>
>
>
> tier2.sinks.impsink.type = file_roll
> tier2.sinks.impsink.channel = channelimp
> tier2.sinks.impsink.sink.directory = /var/log/flume/imp
> tier2.sinks.impsink.sink.rollInterval=60
>
> tier2.sinks.convsink.type = file_roll
> tier2.sinks.convsink.channel = channelconv
> tier2.sinks.convsink.sink.directory = /var/log/flume/conv
> tier2.sinks.convsink.sink.rollInterval=60
>
> tier2.sinks.clksink.type = file_roll
> tier2.sinks.clksink.channel = channelclk
> tier2.sinks.clksink.sink.directory = /var/log/flume/clk
> tier2.sinks.clksink.sink.rollInterval=60
>
>
> tier2.sinks.rtsink.type = file_roll
> tier2.sinks.rtsink.channel = channelrt
> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt
> tier2.sinks.rtsink.sink.rollInterval=60
>
>
> #################### CHANNELS ##############################
>
> tier2.channels.channelimp.checkpointDir =
> /home/flume/channelimp/file-channel/checkpoint
> tier2.channels.channelimp.dataDirs =
> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data
>
>
> tier2.channels.channelclk.checkpointDir =
> /home/flume/channelclk/file-channel/checkpoint
> tier2.channels.channelclk.dataDirs =
> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data
>
> tier2.channels.channelconv.checkpointDir =
> /home/flume/channelconv/file-channel/checkpoint
> tier2.channels.channelconv.dataDirs =
> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data
>
> tier2.channels.channelrt.checkpointDir =
> /home/flume/channelrt/file-channel/checkpoint
> tier2.channels.channelrt.dataDirs =
> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data
>
> tier2.channels.channelhdfsrt.checkpointDir =
> /home/flume/channelhdfsrt/file-channel/checkpoint
> tier2.channels.channelhdfsrt.dataDirs =
> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data
>
> tier2.channels.channelhdfsdel.checkpointDir =
> /home/flume/channelhdfsdel/file-channel/checkpoint
> tier2.channels.channelhdfsdel.dataDirs =
> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data
>
>
>
>
> #################### CHANNELS ##############################
>
>
> tier2.sinks.hdfssinkrt.type = hdfs
> tier2.sinks.hdfssinkrt.channel = channelhdfsrt
> tier2.sinks.hdfssinkrt.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip
> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt
> # Roll based on the block size only
> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120
> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000
> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000
>
>
> tier2.sinks.hdfssinkdel.type = hdfs
> tier2.sinks.hdfssinkdel.channel = channelhdfsdel
> tier2.sinks.hdfssinkdel.hdfs.path =
> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H
> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip
> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream
> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel
> # Roll based on the block size only
> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000
> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120
> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0
> # seconds to wait before closing the file.
> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60
> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000
> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000
> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20
> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false
> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000
> #################### END OF SINKS ##############################
>
>
>
>
>
>
>
>
>
>

Mime
View raw message