flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: Flume Consumer name
Date Tue, 10 Mar 2015 20:44:01 GMT
we start from the most recent by default.

On Tue, Mar 10, 2015 at 1:31 PM, Alex Bohr <alex@gradientx.com> wrote:
> Cool, easy fix.
>
> What about the initial offset - does the consumer start from the oldest or
> the most recent offset?
>
> Thanks!
>
> On Tue, Mar 10, 2015 at 12:31 PM, Gwen Shapira <gshapira@cloudera.com>
> wrote:
>>
>> Just a small typo, I think.
>>
>> should be groupId (capital I. Flume standardized on camel case)
>>
>>
>>
>> On Tue, Mar 10, 2015 at 12:09 PM, Alex Bohr <alex@gradientx.com> wrote:
>> > After running some flafka tests I wanted to switch the name of the
>> > consumer
>> > group to reset the offset to the most recent, instead of waiting for the
>> > for
>> > the agents to plow through the backlog.
>> > So I changed the value in the "groupId" property.
>> >
>> > But I don't see the new consumer group in Kafka.  It looks like the
>> > agents
>> > are still consuming under the original consumer group name "flume."
>> >
>> > But I know the new config file is being used because I also changed the
>> > directory in the sink and they are writing to the new HDFS path.
>> >
>> > Here's my new config file:
>> >
>> > flume1.sources  = kafka-source-1
>> > flume1.channels = hdfs-channel-1
>> > flume1.sinks    = hdfs-sink-1
>> >
>> > # For each source, channel, and sink, set
>> > # standard properties.
>> > flume1.sources.kafka-source-1.type =
>> > org.apache.flume.source.kafka.KafkaSource
>> > flume1.sources.kafka-source-1.zookeeperConnect =
>> > xxx.xx.xx.107:2181/kafkaCluster
>> > flume1.sources.kafka-source-1.topic = Events3
>> > flume1.sources.kafka-source-1.groupid = newfloom
>> > flume1.sources.kafka-source-1.batchSize = 10000
>> > flume1.sources.kafka-source-1.channels = hdfs-channel-1
>> >
>> > flume1.channels.hdfs-channel-1.type   = memory
>> > flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
>> > flume1.sinks.hdfs-sink-1.type = hdfs
>> > flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
>> > flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
>> > flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
>> > %Y-%m-%d-%H-%M-%{host}-1-sequence-events
>> > flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
>> > flume1.sinks.hdfs-sink-1.hdfs.path =
>> > hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume_2/%{topic}
>> > flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
>> > flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
>> > flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120
>> >
>> > # Other properties are specific to each type of
>> > # source, channel, or sink. In this case, we
>> > # specify the capacity of the memory channel.
>> > flume1.channels.hdfs-channel-1.capacity = 500000
>> > flume1.channels.hdfs-channel-1.transactionCapacity = 100000
>> >
>> >
>> >
>> > Any advice on how to change the consumer group name?
>> >
>> > Also, how does the kafka consumer set its initial offest when starting a
>> > new
>> > source/consumer?
>> > I'm just hoping it sets it at the most recent but it probably sets it at
>> > the
>> > oldest offset.  And if so, how can I get it to start at the most recent?
>> >
>> > Thanks!
>
>

Mime
View raw message