flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Anchlia <mohitanch...@gmail.com>
Subject Re: Is this my config problem
Date Sat, 16 Jun 2012 00:51:24 GMT
After I changed my config to this it worked. It looks like flume creates
new file for any of the conditions that matches first. Since there is a
default of 10 for rollCount it was creating a new document. But I think it
causes lot of problem because I need to now keep track and estimate all
these variables. I think it should just do based on what's specified in the
config, so if I only specify rollSize then it souldn't consider any other
options for it's logic to create a new file.

foo.sinks.hdfsSink.type = hdfs
foo.sinks.hdfsSink.hdfs.path = hdfs://dsdb1:54310/flume/%{host}
foo.sinks.hdfsSink.hdfs.filePrefix = web
foo.sinks.hdfsSink.hdfs.rollInterval  = 600
foo.sinks.hdfsSink.hdfs.rollCount  = 200000000
foo.sinks.hdfsSink.hdfs.rollSize  = 5000000000
foo.sinks.hdfsSink.hdfs.fileType  = SequenceFile



On Fri, Jun 15, 2012 at 5:38 PM, Mohit Anchlia <mohitanchlia@gmail.com>wrote:

> What I am seeing is that for every event that I send a new file in hadoop
> is being created. I was expecting that file handle would just write to
> existing file until it gets rolled over as specified in the configs. Am I
> doing something wrong?
>
>  12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956.tmp
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027956
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957.tmp
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027957
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958.tmp
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027958
> 12/06/15 17:28:52 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027959
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027960
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027961
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027962
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027963
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Creating
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964.tmp
> 12/06/15 17:28:53 INFO hdfs.BucketWriter: Renaming
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964.tmp to
> hdfs://dsdb1:54310/flume/'dslg1'/FlumeData.1339806027964
>
> foo.sources = avroSrc
> foo.channels = memoryChannel
> foo.sinks = hdfsSink
> # For each one of the sources, the type is defined
> foo.sources.avroSrc.type = avro
> # The channel can be defined as follows.
> foo.sources.avroSrc.channels = memoryChannel
> foo.sources.avroSrc.bind = 0.0.0.0
> foo.sources.avroSrc.port = 41414
> # Each sink's type must be defined
> foo.sinks.hdfsSink.type = hdfs
> foo.sinks.hdfsSink.hdfs.path = hdfs://dsdb1:54310/flume/'%{host}'
> foo.sinks.hdfsSink.file.Prefix = web
> foo.sinks.hdfsSink.file.rollInterval  = 600
> foo.sinks.hdfsSink.file.Type  = SequenceFile
> #Specify the channel the sink should use
> foo.sinks.hdfsSink.channel = memoryChannel
>
> code:
>
>  public void sendDataToFlume(String data) {
>   // Create flume event object
>   Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
>   Map<String,String> headers = new HashMap<String,String>();
>   headers.put("host", hostName);
>   event.setHeaders(headers);
>   try {
>    rpcClient.append(event);
>   } catch (EventDeliveryException e) {
>    connect();
>   }
>
>
>  @Test
>  public void testAvroClient() throws InterruptedException{
>   AvroClient aClient = new AvroClient();
>   int i = 0;
>   int j = 500;
>   while(i++ < j){
>    aClient.sendDataToFlume("Hello");
>    if(i == j/2){
>     //Thread.sleep(30000);
>    }
>   }
>
>  }
>  }
>

Mime
View raw message