flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Registering custom metrics does not work
Date Thu, 06 Jul 2017 06:45:14 GMT
Hello,

Plase provide more information as to how it is not working as expected.

Does it throw an exception, log a warning, is the metric
not get registered at all or does the value not changing?

On 06.07.2017 08:10, wyphao.2007 wrote:
> Hi, all
> I want to know element's latency before write to Elasticsearch, so I 
> registering a custom metrics as follow:
>
> class CustomElasticsearchSinkFunction extends 
> ElasticsearchSinkFunction[EventEntry] {
>   private var metricGroup: Option[MetricGroup] = None
>   private var latency: Long = _
>
>   private def init(runtimeContext: RuntimeContext): Unit = {
>     if (metricGroup.isEmpty) {
>       metricGroup = Some(runtimeContext.getMetricGroup)
>       metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", 
> ScalaGauge[Long](() => latency))
>     }
>   }
>
>   def createIndexRequest(element: EventEntry, runtimeContext: 
> RuntimeContext): IndexRequest = {
>     init(runtimeContext)
>     latency = System.currentTimeMillis() - element.executeTime.getMillis
> Requests.indexRequest.index("test").`type`("event").source(element.json)
>   }
>
>   override def process(element: EventEntry,
>                        runtimeContext: RuntimeContext,
>                        requestIndexer: RequestIndexer): Unit =
>     requestIndexer.add(createIndexRequest(element, runtimeContext))
> }
>
> but that does not seem to work, Does anyone know why?
>
> Regards
> wyp
>


Mime
View raw message