apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sandesh Hegde <sand...@datatorrent.com>
Subject Re: Some Questions on consuming/restarting/monitoring Apex Kafka consumer
Date Fri, 09 Dec 2016 20:31:53 GMT
To add to Ram's point,

Kafka Input operator uses Application Name as consumer group name. When you
restart the application, it should automatically restart from the stored
offset.

1st problem you mentioned is because of the behavior you mentioned in the
point 3. Can you please verify Kafka once using console consumer and
producer?

On Fri, Dec 9, 2016 at 10:58 AM Munagala Ramanath <ram@datatorrent.com>
wrote:

> For 1, try launching with the previous application id documented at:
> http://docs.datatorrent.com/apexcli/
> under the *-originalAppId* parameter of the *launch* command. The
> starting offset can also be controlled
> by the *initialOffset* configuration property documented at:
> http://apex.apache.org/docs/malhar/operators/kafkaInputOperator/
>
> For 2, the abstract base class is tracking some metrics in the metrics
> field annotated with *@AutoMetric*.
> These stats are retrievable via a *StatsListener* associated with the
> operator. An example that logs
> metrics to the Application Master log file (*dt.log*) is available a (
> *kafka-stats* branch)t:
>
> https://github.com/amberarrow/examples/tree/kafka-stats/tutorials/kafka
>
> The log lines it prints look something like this:
>
> *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener: 1:
> processStats: metrics.size = 1*
> *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener:
> metric: key = metrics, value.class =
> org.apache.apex.malhar.kafka.KafkaMetrics*
> *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener:
> bytes-consumed-rate = 7614.458510728283, records-consumed-rate =
> 253.8277174679261*
>
> These are the metrics documented at:
> http://docs.confluent.io/2.0.1/kafka/monitoring.html
>
> For 3, perhaps the Kafka experts can respond.
>
> Ram
>
> On Thu, Dec 8, 2016 at 8:36 AM, Arvindan Thulasinathan <
> aravindan.thulasinathan@oracle.com> wrote:
>
> Hi,
>   I am using “org.apache.apex.malhar.kafka.AbstractKafkaInputOperator”
> which supports 0.9.0 + version of Kafka consumer API. I have InitialOffset
> set to “APPLICATION_OR_LATEST”
>
> I have a few questions.
> 1. When I kill my application and restart, I expect the app to restart
> from the last offset it consumed. But this is not happening. It
> automatically starts consuming from the latest offest. Is it because I am
> killing the app and not shutting-down?
> 2. Is there a way to measure my Kafka consumption rate from the Apex
> application? I am looking for something out-of-box rather than sending the
> consumption rate over a stream to an output operator.
> 3. I couldn’t track offsets on the Kafka broker.
> I tried using: "bin/kafka-consumer-groups.sh --new-consumer
> --bootstrap-server <kafa-broker>:<port> —list” script on my Kafka broker.
> But the result is empty. Any idea, if I need to add some parameter/config
> to my Apex Kafka consumer to enable it to commit offset to Broker?
>
>
> Thanks,
> Aravindan
>
>
> --
Thanks
Sandesh

Mime
View raw message