flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "wyphao.2007" <wyphao.2...@163.com>
Subject Registering custom metrics does not work
Date Thu, 06 Jul 2017 06:10:36 GMT
Hi, all
I want to know element's latency before write to Elasticsearch, so I registering a custom
metrics as follow:


class CustomElasticsearchSinkFunction extends ElasticsearchSinkFunction[EventEntry] {
  private var metricGroup: Option[MetricGroup] = None
  private var latency: Long = _


  private def init(runtimeContext: RuntimeContext): Unit = {
    if (metricGroup.isEmpty) {
      metricGroup = Some(runtimeContext.getMetricGroup)
      metricGroup.get.gauge[Long, Gauge[Long]]("esLatency", ScalaGauge[Long](() => latency))
    }
  }


  def createIndexRequest(element: EventEntry, runtimeContext: RuntimeContext): IndexRequest
= {
    init(runtimeContext)
    latency = System.currentTimeMillis() - element.executeTime.getMillis
    Requests.indexRequest.index("test").`type`("event").source(element.json)
  }


  override def process(element: EventEntry,
                       runtimeContext: RuntimeContext,
                       requestIndexer: RequestIndexer): Unit =
    requestIndexer.add(createIndexRequest(element, runtimeContext))
}


but that does not seem to work, Does anyone know why?


Regards
wyp

Mime
View raw message