Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AD464200B7C for ; Thu, 8 Sep 2016 14:35:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AC190160ABD; Thu, 8 Sep 2016 12:35:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EFA7A160AA5 for ; Thu, 8 Sep 2016 14:35:57 +0200 (CEST) Received: (qmail 21511 invoked by uid 500); 8 Sep 2016 12:35:57 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 21497 invoked by uid 99); 8 Sep 2016 12:35:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 12:35:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A127B1A5D9C for ; Thu, 8 Sep 2016 12:35:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.626 X-Spam-Level: X-Spam-Status: No, score=-4.626 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id XvaNCwSX47EM for ; Thu, 8 Sep 2016 12:35:54 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B2A3260CFA for ; Thu, 8 Sep 2016 12:35:53 +0000 (UTC) Received: (qmail 21393 invoked by uid 99); 8 Sep 2016 12:35:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 12:35:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6CE21DFF56; Thu, 8 Sep 2016 12:35:52 +0000 (UTC) From: zentol To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ... Content-Type: text/plain Message-Id: <20160908123552.6CE21DFF56@git1-us-west.apache.org> Date: Thu, 8 Sep 2016 12:35:52 +0000 (UTC) archived-at: Thu, 08 Sep 2016 12:35:58 -0000 Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --------------- Metrics --------------------------- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // ------------------------------------------------------------------------ // Life Cycle // ------------------------------------------------------------------------ @Override - public void setup(StreamTask containingTask, StreamConfig config, Output> output) { + public void setup(StreamTask containingTask, StreamConfig config, Output> output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---