flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kumar, Suresh" <Suresh.Kum...@emc.com>
Subject RE: Flume Source and Sink in different hosts
Date Fri, 05 Oct 2012 18:27:09 GMT
Just a quick update, it is definitely a source issue and nothing to do with flume configuration
in the sink.

 

I restarted the sink, I do not see the data in HBase, however if I stop the agent in source,
I do not see

any data, but as soon as I start the agent in source, I see the data in my HBase which is
in HostB.

 

Thanks for any help,

Suresh

 

 

From: Kumar, Suresh [mailto:Suresh.Kumar4@emc.com] 
Sent: Friday, October 05, 2012 11:08 AM
To: user@flume.apache.org
Subject: RE: Flume Source and Sink in different hosts

 

I increased the heap size in source and sink to 1G, I now use the AsyncHBaseSink in my sink
agent configuration, it didn’t make

that much of a difference.

 

I changed my source agent configuration from memory to file in HostA, I did not change my
sink agent configuration in HostB

(it is still set to Memory Channel). I still see the latency issue (BTW, the auth.log grows
every second). However I noticed

that if I kill the agent in HostA (source) and restart, I see entries in HBase. Am I missing
something? How often does the data

get flushed from source to sink? Should sink also be the same channel type (file)?

 

Here is my conf and log for HostA (source)

 

flume.conf (source)

 

agent3.sources = tail

agent3.channels = FileChannel-1

agent3.sinks = avro-sink

 

# Define source flow

agent3.sources.tail.type = exec

agent3.sources.tail.command = tail -F /var/log/auth.log

agent3.sources.tail.channels = FileChannel-1

 

# What kind of channel

agent3.channels.FileChannel-1.type = file

agent3.channels.FileChannel-1.checkpointDir = /tmp/checkpoint

agent3.channels.FileChannel-1.dataDirs = /tmp/data

 

# avro sink properties

agent3.sinks.avro-sink.type = avro

agent3.sinks.avro-sink.channel = FileChannel-1

agent3.sinks.avro-sink.hostname = sig-flume

agent3.sinks.avro-sink.port = 41414

 

 

Log (source)

 

 

2012-10-05 10:49:03,736 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3

2012-10-05 10:49:03,752 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting

2012-10-05 10:49:03,752 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting

2012-10-05 10:49:03,760 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 12

2012-10-05 10:49:03,763 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started

2012-10-05 10:49:03,767 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started

2012-10-05 10:49:03,769 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes

2012-10-05 10:49:03,772 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf

2012-10-05 10:49:03,801 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3

2012-10-05 10:49:03,802 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-05 10:49:03,803 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname

2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-05 10:49:03,804 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

2012-10-05 10:49:03,805 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]

SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1, type=exec}
}}

CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}

SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}

 

2012-10-05 10:49:03,823 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel FileChannel-1

2012-10-05 10:49:03,850 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO

2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3

AgentConfiguration created without Configuration stubs for which only basic syntactical validation
was performed[agent3]

SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1, type=exec}
}}

CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}

SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}

 

2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:FileChannel-1

 

2012-10-05 10:49:03,861 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink

 

2012-10-05 10:49:03,863 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail

 

2012-10-05 10:49:03,864 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]

2012-10-05 10:49:03,865 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels

2012-10-05 10:49:03,866 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel FileChannel-1 type file

2012-10-05 10:49:03,899 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: FileChannel-1, registered successfully.

2012-10-05 10:49:03,900 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel FileChannel-1

2012-10-05 10:49:03,900 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec

2012-10-05 10:49:03,934 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro

2012-10-05 10:49:03,945 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.

2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@68814013
counterGroup:{ name:null counters:{} } }} channels:{FileChannel-1=FileChannel FileChannel-1
{ dataDirs: [/tmp/data] }} }

2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel FileChannel-1

2012-10-05 10:49:03,951 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: FileChannel-1 to start. Sleeping for 500 ms

2012-10-05 10:49:03,952 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:241)]
Starting FileChannel FileChannel-1 { dataDirs: [/tmp/data] }...

2012-10-05 10:49:03,993 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:248)]
Encryption is not enabled

2012-10-05 10:49:04,000 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:292)]
Replay started

2012-10-05 10:49:04,023 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:304)]
Found NextFileID 1, from [/tmp/data/log-1]

2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:46)]
Starting up with /tmp/checkpoint/checkpoint and /tmp/checkpoint/checkpoint.meta

2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:50)]
Reading checkpoint metadata from /tmp/checkpoint/checkpoint.meta

2012-10-05 10:49:04,100 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:336)]
Last Checkpoint Fri Oct 05 10:43:53 PDT 2012, queue depth = 0

2012-10-05 10:49:04,107 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:355)]
Replaying logs with v2 replay logic

2012-10-05 10:49:04,110 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:223)]
Starting replay of [/tmp/data/log-1]

2012-10-05 10:49:04,113 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2

2012-10-05 10:49:04,117 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2

2012-10-05 10:49:04,118 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:236)]
Replaying /tmp/data/log-1

2012-10-05 10:49:04,133 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize(DirectMemoryUtils.java:113)]
Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)

2012-10-05 10:49:04,139 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.allocate(DirectMemoryUtils.java:47)]
Direct Memory Allocation:  Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 954466304,
Remaining = 954466304

2012-10-05 10:49:04,181 (lifecycleSupervisor-1-0) [WARN - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:431)]
Checkpoint for file(/tmp/data/log-1) is: 1349459033373, which is beyond the requested checkpoint
time: 1349459033373 and position 0

2012-10-05 10:49:04,218 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.next(LogFile.java:452)]
Encountered EOF at 2423 in /tmp/data/log-1

2012-10-05 10:49:04,219 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:320)]
read: 17, put: 16, take: 0, rollback: 0, commit: 1, skip: 0, eventCount:16

2012-10-05 10:49:04,222 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:362)]
Rolling /tmp/data

2012-10-05 10:49:04,223 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:726)]
Roll start /tmp/data

2012-10-05 10:49:04,229 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$Writer.<init>(LogFile.java:138)]
Opened /tmp/data/log-2

2012-10-05 10:49:04,253 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:741)]
Roll end

2012-10-05 10:49:04,269 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16

2012-10-05 10:49:04,306 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344074, queueSize: 16, queueHead: 999998

2012-10-05 10:49:04,408 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 0, logWriteOrderID = 1349459344074

2012-10-05 10:49:04,427 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 0 logWriteOrderID: 1349459344074

2012-10-05 10:49:04,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$RandomReader.close(LogFile.java:317)]
Closing RandomReader /tmp/data/log-1

2012-10-05 10:49:04,434 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-1.meta currentPosition = 0, logWriteOrderID = 1349459344074

2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:816)]
Updated checkpoint for file: /tmp/data/log-1logWriteOrderID 1349459344074

2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:264)]
Queue Size after replay: 16 [channel=FileChannel-1]

2012-10-05 10:49:04,452 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: FileChannel-1 started

2012-10-05 10:49:04,453 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink

2012-10-05 10:49:04,457 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail

2012-10-05 10:49:04,459 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...

2012-10-05 10:49:04,460 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started

2012-10-05 10:49:04,462 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log

2012-10-05 10:49:04,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414

2012-10-05 10:49:04,477 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started

2012-10-05 10:49:04,524 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null

2012-10-05 10:49:05,341 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}

2012-10-05 10:49:05,342 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.

2012-10-05 10:49:05,345 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting

2012-10-05 10:49:05,363 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.LogFile$Writer.preallocate(LogFile.java:253)]
Preallocating at position 0

2012-10-05 10:49:07,007 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16

2012-10-05 10:49:07,047 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344092, queueSize: 0, queueHead: 14

2012-10-05 10:49:07,101 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 692, logWriteOrderID = 1349459344092

2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 692 logWriteOrderID: 1349459344092

2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.Log.removeOldLogs(Log.java:846)]
Files currently in use: [2]

2012-10-05 10:49:34,465 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes

2012-10-05 10:49:37,124 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:99)]
Checkpoint not required

 

From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
Sent: Thursday, October 04, 2012 4:25 PM
To: user@flume.apache.org
Subject: Re: Flume Source and Sink in different hosts

 

It depends on what kind of guarantees you need. If you need to make sure your events are persisted
even during process/system failures, you should use the File Channel, else you can use Memory
Channel (performance of Memory Channel is obviously better).  

 

Thanks,

Hari

 

-- 

Hari Shreedharan

 

On Thursday, October 4, 2012 at 3:46 PM, Kumar, Suresh wrote:

	 

	Hari, I just noticed some entries in HBase, so this configuration does work.

	I will retry with the changes you recommended. Do you think I should be using

	some other channel type instead of memory?

	 

	Thanks,

	Suresh

	From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
	Sent: Thursday, October 04, 2012 3:40 PM
	To: user@flume.apache.org
	Subject: Re: Flume Source and Sink in different hosts

	 

	Looks like your agent was set up properly. Can you increase the heap and try again? You can
do this by setting -Xmx in the flume-env.sh file. Try setting it to 1G or higher, since you
are using memory channel. Also I assume the file you are tailing is getting written to? I
strongly suggest using the AsyncHBaseSink.  

	 

	 

	Thanks,

	Hari

	 

	-- 

	Hari Shreedharan

	 

	On Thursday, October 4, 2012 at 3:19 PM, Kumar, Suresh wrote:

		Yes, my HBase has the table and column family, if I run the /etc/passwd test using flume-ng
client, the table

		gets populated.

		 

		Here is the log from the source agent, there is nothing much in the sink except for which
seem to benign.

		
		Thanks,

		Suresh

		 

		2012-10-04 14:59:05,622 (lifecycleSupervisor-1-0-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.client.ZooKeeperSaslClient.clientTunneledAuthenticationInProgress(ZooKeeperSaslClient.java:515)]
Could not retrieve login configuration: java.lang.SecurityException: Unable to locate a login
configuration

		2012-10-04 14:59:08,414 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

		 

		source agent log:

		 

		 

		$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console
-n agent3

		 

		+ exec /usr/lib/jvm/java-6-sun/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/opt/flume/conf:/opt/flume/lib/*'
-Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent3

		2012-10-04 15:09:30,778 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 1

		2012-10-04 15:09:30,791 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3

		2012-10-04 15:09:30,799 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting

		2012-10-04 15:09:30,801 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting

		2012-10-04 15:09:30,810 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 10

		2012-10-04 15:09:30,813 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started

		2012-10-04 15:09:30,819 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started

		2012-10-04 15:09:30,819 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

		2012-10-04 15:09:30,821 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf <file:///\\conf\flume.conf> 

		2012-10-04 15:09:30,839 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3

		2012-10-04 15:09:30,840 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

		2012-10-04 15:09:30,840 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname

		2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

		2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

		2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

		2012-10-04 15:09:30,841 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]

		SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

		CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

		SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

		 

		2012-10-04 15:09:30,854 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel MemoryChannel-1

		2012-10-04 15:09:30,883 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO

		2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3

		AgentConfiguration created without Configuration stubs for which only basic syntactical
validation was performed[agent3]

		SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

		CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

		SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

		 

		2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:MemoryChannel-1

		 

		2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink

		 

		2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail

		 

		2012-10-04 15:09:30,885 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]

		2012-10-04 15:09:30,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels

		2012-10-04 15:09:30,886 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel MemoryChannel-1 type memory

		2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: MemoryChannel-1, registered successfully.

		2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel MemoryChannel-1

		2012-10-04 15:09:31,014 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec

		2012-10-04 15:09:31,037 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro

		2012-10-04 15:09:31,045 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.

		2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@e949f69
counterGroup:{ name:null counters:{} } }} channels:{MemoryChannel-1=org.apache.flume.channel.MemoryChannel{name:
MemoryChannel-1}} }

		2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel MemoryChannel-1

		2012-10-04 15:09:31,049 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: MemoryChannel-1 to start. Sleeping for 500 ms

		2012-10-04 15:09:31,052 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: MemoryChannel-1 started

		2012-10-04 15:09:31,550 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink

		2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...

		2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started

		2012-10-04 15:09:31,552 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail

		2012-10-04 15:09:31,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414

		2012-10-04 15:09:31,561 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log

		2012-10-04 15:09:31,586 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started

		2012-10-04 15:09:31,626 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null

		2012-10-04 15:09:32,684 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}

		2012-10-04 15:09:32,685 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.

		2012-10-04 15:09:32,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting

		2012-10-04 15:10:01,565 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

		2012-10-04 15:10:31,567 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(Abstr

		 

		 

		From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
		Sent: Thursday, October 04, 2012 3:02 PM
		To: user@flume.apache.org
		Subject: Re: Flume Source and Sink in different hosts

		 

		Can you send the logs also, of both agents? Does your Hbase cluster have the said column
family and table with that family? 

		 

		Also are you sure the files are not getting rotated out. You should use tail -F so that
your code works even with files getting rotated out.

		 

		 

		Hari

		-- 

		Hari Shreedharan

		 

		On Thursday, October 4, 2012 at 2:53 PM, Kumar, Suresh wrote:

			Hello:

			 

			I have just downloaded and build flume-ng (apache-flume-1.3.0-SNAPSHOT).

			 

			My goal is to collect log data from HostA (source) and send it to HostB(sink), my initial
test (sending /etc/passwd) 

			from HostA to HostB worked fine, I was also able to load the passwd file into my HBase
in HostB.

			 

			Now, I want to load a continuous stream of log data (using tail –f), but I was not able
to replicate the above process.

			Flume just started fine in HostA, but I do not see any data being received by HostB or
in my HBase.

			 

			What is wrong with my configuration?

			 

			Thanks,

			Suresh

			 

			Here is my flume.conf in HostA

			 

			agent3.sources = tail

			agent3.channels = MemoryChannel-1

			agent3.sinks = avro-sink

			 

			# Define source flow

			agent3.sources.tail.type = exec

			agent3.sources.tail.command = tail -f /var/log/auth.log

			agent3.sources.tail.channels = MemoryChannel-1

			 

			# What kind of channel

			agent3.channels.MemoryChannel-1.type = memory

			 

			# avro sink properties

			agent3.sinks.avro-sink.type = avro

			agent3.sinks.avro-sink.channel = MemoryChannel-1

			agent3.sinks.avro-sink.hostname = hostb

			agent3.sinks.avro-sink.port = 41414

			 

			Here is my flume.conf in HostB

			 

			# Define a memory channel called ch1 on agent1

			agent1.channels.ch1.type = memory

			 

			# Define an Avro source called avro-source1 on agent1 and tell it

			# to bind to 0.0.0.0:41414. Connect it to channel ch1.

			agent1.sources.avro-source1.channels = ch1

			agent1.sources.avro-source1.type = avro

			agent1.sources.avro-source1.bind = 0.0.0.0

			agent1.sources.avro-source1.port = 41414

			 

			# Define a logger sink that simply logs all events it receives

			# and connect it to the other end of the same channel.

			agent1.sinks.log-sink1.channel = ch1

			agent1.sinks.log-sink1.type = logger

			 

			# Finally, now that we've defined all of our components, tell

			# agent1 which ones we want to activate.

			agent1.channels = ch1

			agent1.sources = avro-source1

			#agent1.sources = avro-source1

			agent1.sinks = sink1

			 

			agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink

			agent1.sinks.sink1.channel = ch1

			agent1.sinks.sink1.table = flumedemo

			agent1.sinks.sink1.columnFamily = testing

			agent1.sinks.sink1.column = foo

			agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

			agent1.sinks.sink1.serializer.payloadColumn = col1

			agent1.sinks.sink1.serializer.keyType = timestamp

			agent1.sinks.sink1.serializer.rowPrefix = 1

			agent1.sinks.sink1.serializer.suffix = timestamp

			agent1.sinks.sink1.serializer.payloadColumn = pcol

			agent1.sinks.sink1.serializer.incrementColumn = icol

			 

		 

	 

 

Mime
View raw message