flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kevin Lee <defeng...@yottaa.com>
Subject Re: How to use LZO in Flume-ng
Date Wed, 29 Aug 2012 01:14:11 GMT
Thanks Denny's reply.

I figure it out, the key different configuration is: 

    agent.sinks.lzo-hdfs-write.hdfs.codeC = com.hadoop.compression.lzo.LzopCodec

Thanks,
- Kevin

On Aug 28, 2012, at 3:28 PM, Denny Ye wrote:

> 'com.hadoop.compression.lzo.LzoCodec' is one of extension for 'org.apache.hadoop.io.compress.CompressionCodec'
 
> 
> 2012/8/28 Denny Ye <dennyy99@gmail.com>
> hi Kevin,
>     I applied for LZO successfully. I will post my LZO configuration, you can compare
the difference.
>     
>     1. agent.sinks.hdfsSin1.hdfs.codeC = com.hadoop.compression.lzo.LzoCodec
>     2. Added this configuration at Hadoop core-site.xml
>        <property>
>             <name>io.compression.codecs</name>
>             <value>com.hadoop.compression.lzo.LzoCodec</value>
>        </property>
> 
> -Regards
> Denny Ye
>    
> 
> 2012/8/28 Kevin Lee <defeng.li@yottaa.com>
> Folks,
> 
> I was follow this link Hadoop at Twitter (part 1): Splittable LZO Compression to integration
LZO in Hadoop2.0, but seems Flume-ng lzo compress not work.
> 
> My flume-ng configuratioin file is:
> 
> cat > /tmp/flume-lzo.conf <<EOF
> agent.sources = lzo-avro-collect
> agent.channels = lzo-memory-channel
> agent.sinks = lzo-hdfs-write
> 
> agent.sources.lzo-avro-collect.type = avro
> agent.sources.lzo-avro-collect.bind = 0.0.0.0
> agent.sources.lzo-avro-collect.port = 12345
> agent.sources.lzo-avro-collect.channels = lzo-memory-channel
> agent.channels.lzo-memory-channel.type = memory
> agent.channels.lzo-memory-channel.capacity = 1000000
> agent.channels.lzo-memory-channel.transactionCapacity = 10000
> agent.channels.lzo-memory-channel.stay-alive = 3
> agent.sinks.lzo-hdfs-write.type = hdfs
> agent.sinks.lzo-hdfs-write.hdfs.path = hdfs://10.34.4.55:8020/tmp/
> agent.sinks.lzo-hdfs-write.hdfs.filePrefix = test%Y
> agent.sinks.lzo-hdfs-write.channel = lzo-memory-channel
> agent.sinks.lzo-hdfs-write.hdfs.rollInterval = 3600
> agent.sinks.lzo-hdfs-write.hdfs.rollSize = 209715200
> agent.sinks.lzo-hdfs-write.hdfs.rollCount = 0
> agent.sinks.lzo-hdfs-write.hdfs.batchSize = 1000
> agent.sinks.lzo-hdfs-write.hdfs.codeC = lzo
> agent.sinks.lzo-hdfs-write.hdfs.fileType = CompressedStream
> EOF
> and i start flume-ng-agent on front
> 
> sudo -u flume flume-ng agent -n agent -f /tmp/flume-lzo.conf
> using avro-client to ship the event.
> 
> echo aaaaaaaaaaaaaaaaa > /tmp/events
> sudo -u flume flume-ng avro-client -H localhost -p 12345 -F /tmp/events
> the flume-ng-agent collector log as follow:
> 
> 12/08/28 06:33:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
> 12/08/28 06:33:53 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo
library [hadoop-lzo rev 6bb1b7f8b9044d8df9b4d2b6641db7658aab3cf8]
> 12/08/28 06:33:54 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
> 12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{
sourceRunners:{lzo-avro-collect=EventDrivenSourceRunner: { source:AvroSource: { bindAddress:0.0.0.0
port:12345 } }} sinkRunners:{lzo-hdfs-write=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@39e57e8f
counterGroup:{ name:null counters:{} } }} channels:{lzo-memory-channel=org.apache.flume.channel.MemoryChannel@9d7fbfb}
}
> 12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel lzo-memory-channel
> 12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink lzo-hdfs-write
> 12/08/28 06:33:54 INFO nodemanager.DefaultLogicalNodeManager: Starting Source lzo-avro-collect
> 12/08/28 06:33:54 INFO source.AvroSource: Avro source starting:AvroSource: { bindAddress:0.0.0.0
port:12345 }
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => /127.0.0.1:12345]
OPEN
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => /127.0.0.1:12345]
BOUND: /127.0.0.1:12345
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 => /127.0.0.1:12345]
CONNECTED: /127.0.0.1:48085
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> /127.0.0.1:12345]
DISCONNECTED
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> /127.0.0.1:12345]
UNBOUND
> 12/08/28 06:34:02 INFO ipc.NettyServer: [id: 0x651db6bb, /127.0.0.1:48085 :> /127.0.0.1:12345]
CLOSED
> 12/08/28 06:34:03 INFO hdfs.BucketWriter: Creating hdfs://10.34.4.55:8020/tmp//test.1346135643045.lzo_deflate.tmp
> 12/08/28 06:34:04 WARN conf.Configuration: hadoop.native.lib is deprecated. Instead,
use io.native.lib.available
> ^C12/08/28 06:34:26 INFO node.FlumeNode: Flume node stopping - agent
> 12/08/28 06:34:26 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 8
> 12/08/28 06:34:26 INFO nodemanager.DefaultLogicalNodeManager: Node manager stopping
> 12/08/28 06:34:26 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 8
> 12/08/28 06:34:26 INFO source.AvroSource: Avro source stopping:AvroSource: { bindAddress:0.0.0.0
port:12345 }
> 12/08/28 06:34:26 INFO hdfs.HDFSEventSink: Closing hdfs://10.34.4.55:8020/tmp//test
> 12/08/28 06:34:26 INFO hdfs.BucketWriter: Renaming hdfs://10.34.4.55:8020/tmp/test.1346135643045.lzo_deflate.tmp
to hdfs://10.34.4.55:8020/tmp/test.1346135643045.lzo_deflate
> 12/08/28 06:34:26 INFO properties.PropertiesFileConfigurationProvider: Configuration
provider stopping
> When i shutdown the colloector, agent generated a “”/tmp/test.1346135643045.lzo_deflate”
file”, it seems ok, but i can’t uncompress it. The command as follow:
> 
> [root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -ls /tmp/test*
> Found 1 items
> -rw-r--r--   3 flume supergroup         30 2012-08-28 06:34 /tmp/test.1346135643045.lzo_deflate
> [root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -get /tmp/test.1346135643045.lzo_deflate
./
> 12/08/28 06:36:26 INFO util.NativeCodeLoader: Loaded the native-hadoop library
> [root@ip-10-34-4-55 tmp]# lzop -d test.1346135643045.lzo_deflate
> lzop: test.1346135643045.lzo_deflate: not a lzop file
> And i verify the flume-ng generated lzo file using hadoop, also not work, the command
and exception as follow.
> 
> [root@ip-10-34-4-55 tmp]# sudo -u hdfs hadoop fs -cp /tmp/test.1346135643045.lzo_deflate
/tmp/test.1346135643045.lzo
> export JAVA_LIBRARY_PATH=/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64
> hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.15.jar com.hadoop.compression.lzo.LzoIndexer
/tmp/test.1346135643045.lzo
> 
> [root@ip-10-34-4-55 tmp]# hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.15.jar com.hadoop.compression.lzo.LzoIndexer
/tmp/test.1346135643045.lzo
> 
> 12/08/28 06:40:14 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
> 12/08/28 06:40:14 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo
library [hadoop-lzo rev 6bb1b7f8b9044d8df9b4d2b6641db7658aab3cf8]
> 12/08/28 06:40:15 INFO lzo.LzoIndexer: [INDEX] LZO Indexing file /tmp/test.1346135643045.lzo,
size 0.00 GB...
> 12/08/28 06:40:15 INFO util.NativeCodeLoader: Loaded the native-hadoop library
> 12/08/28 06:40:15 WARN snappy.LoadSnappy: Snappy native library is available
> 12/08/28 06:40:15 INFO snappy.LoadSnappy: Snappy native library loaded
> 12/08/28 06:40:15 WARN conf.Configuration: hadoop.native.lib is deprecated. Instead,
use io.native.lib.available
> 12/08/28 06:40:15 ERROR lzo.LzoIndexer: Error indexing /tmp/test.1346135643045.lzo
> java.io.IOException: Invalid LZO header
>     at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:116)
>     at com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:54)
>     at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:83)
>     at com.hadoop.compression.lzo.LzoIndex.createIndex(LzoIndex.java:231)
>     at com.hadoop.compression.lzo.LzoIndexer.indexSingleFile(LzoIndexer.java:117)
>     at com.hadoop.compression.lzo.LzoIndexer.indexInternal(LzoIndexer.java:98)
>     at com.hadoop.compression.lzo.LzoIndexer.index(LzoIndexer.java:52)
>     at com.hadoop.compression.lzo.LzoIndexer.main(LzoIndexer.java:137)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:597)
>     at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
> [root@ip-10-34-4-55 tmp]#
> Thanks, 
> - Kevin
> 
> 
> 


Mime
View raw message