From issues-return-185229-archive-asf-public=cust-asf.ponee.io@spark.apache.org Fri Feb 16 17:46:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7B3F918067B for ; Fri, 16 Feb 2018 17:46:06 +0100 (CET) Received: (qmail 92025 invoked by uid 500); 16 Feb 2018 16:46:04 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 91841 invoked by uid 99); 16 Feb 2018 16:46:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Feb 2018 16:46:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 18917C0412 for ; Fri, 16 Feb 2018 16:46:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -110.311 X-Spam-Level: X-Spam-Status: No, score=-110.311 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Ewk_O72eAOhA for ; Fri, 16 Feb 2018 16:46:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id B60E75F522 for ; Fri, 16 Feb 2018 16:46:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id D4AD4E02AC for ; Fri, 16 Feb 2018 16:46:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 4608721E62 for ; Fri, 16 Feb 2018 16:46:00 +0000 (UTC) Date: Fri, 16 Feb 2018 16:46:00 +0000 (UTC) From: "Gabor Somogyi (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SPARK-23288) Incorrect number of written records in structured streaming MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-23288?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D163= 67569#comment-16367569 ]=20 Gabor Somogyi commented on SPARK-23288: --------------------------------------- Seems like no statsTrackers created in FileStreamSink. > Incorrect number of written records in structured streaming > ----------------------------------------------------------- > > Key: SPARK-23288 > URL: https://issues.apache.org/jira/browse/SPARK-23288 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.2.0 > Reporter: Yuriy Bondaruk > Priority: Major > Labels: Metrics, metrics > > I'm using SparkListener.onTaskEnd() to capture input and output metrics b= ut it seems that number of written records ('taskEnd.taskMetrics().outputMe= trics().recordsWritten()') is incorrect. Here is my stream construction: > =C2=A0 > {code:java} > StreamingQuery writeStream =3D session > .readStream() > .schema(RecordSchema.fromClass(TestRecord.class)) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv(inputFolder.getRoot().toPath().toString()) > .as(Encoders.bean(TestRecord.class)) > .flatMap( > ((FlatMapFunction) (u) -> { > List resultIterable =3D new ArrayList<= >(); > try { > TestVendingRecord result =3D transformer.convert(u); > resultIterable.add(result); > } catch (Throwable t) { > System.err.println("Ooops"); > t.printStackTrace(); > } > return resultIterable.iterator(); > }), > Encoders.bean(TestVendingRecord.class)) > .writeStream() > .outputMode(OutputMode.Append()) > .format("parquet") > .option("path", outputFolder.getRoot().toPath().toString()) > .option("checkpointLocation", checkpointFolder.getRoot().toPath()= .toString()) > .start(); > writeStream.processAllAvailable(); > writeStream.stop(); > {code} > Tested it with one good and one bad (throwing exception in transformer.co= nvert(u)) input records and it produces following metrics: > =C2=A0 > {code:java} > (TestMain.java:onTaskEnd(73)) - -----------status--> SUCCESS > (TestMain.java:onTaskEnd(75)) - -----------recordsWritten--> 0 > (TestMain.java:onTaskEnd(76)) - -----------recordsRead-----> 2 > (TestMain.java:onTaskEnd(83)) - taskEnd.taskInfo().accumulables(): > (TestMain.java:onTaskEnd(84)) - name =3D duration total (min, med, max) > (TestMain.java:onTaskEnd(85)) - value =3D 323 > (TestMain.java:onTaskEnd(84)) - name =3D number of output rows > (TestMain.java:onTaskEnd(85)) - value =3D 2 > (TestMain.java:onTaskEnd(84)) - name =3D duration total (min, med, max) > (TestMain.java:onTaskEnd(85)) - value =3D 364 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.input.recordsRe= ad > (TestMain.java:onTaskEnd(85)) - value =3D 2 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.input.bytesRead > (TestMain.java:onTaskEnd(85)) - value =3D 157 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.resultSerializa= tionTime > (TestMain.java:onTaskEnd(85)) - value =3D 3 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.resultSize > (TestMain.java:onTaskEnd(85)) - value =3D 2396 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.executorCpuTime > (TestMain.java:onTaskEnd(85)) - value =3D 633807000 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.executorRunTime > (TestMain.java:onTaskEnd(85)) - value =3D 683 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.executorDeseria= lizeCpuTime > (TestMain.java:onTaskEnd(85)) - value =3D 55662000 > (TestMain.java:onTaskEnd(84)) - name =3D internal.metrics.executorDeseria= lizeTime > (TestMain.java:onTaskEnd(85)) - value =3D 58 > (TestMain.java:onTaskEnd(89)) - input records 2 > Streaming query made progress: { > "id" : "1231f9cb-b2e8-4d10-804d-73d7826c1cb5", > "runId" : "bd23b60c-93f9-4e17-b3bc-55403edce4e7", > "name" : null, > "timestamp" : "2018-01-26T14:44:05.362Z", > "numInputRows" : 2, > "processedRowsPerSecond" : 0.8163265306122448, > "durationMs" : { > "addBatch" : 1994, > "getBatch" : 126, > "getOffset" : 52, > "queryPlanning" : 220, > "triggerExecution" : 2450, > "walCommit" : 41 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : "FileStreamSource[file:/var/folders/4w/zks_kfls2s3glm= rj3f725p7hllyb5_/T/junit3661035412295337071]", > "startOffset" : null, > "endOffset" : { > "logOffset" : 0 > }, > "numInputRows" : 2, > "processedRowsPerSecond" : 0.8163265306122448 > } ], > "sink" : { > "description" : "FileSink[/var/folders/4w/zks_kfls2s3glmrj3f725p7hlly= b5_/T/junit3785605384928624065]" > } > } > {code} > The number of inputs is correct but the number of=C2=A0output records tak= en from taskEnd.taskMetrics().outputMetrics().recordsWritten() is zero. Acc= umulables (taskEnd.taskInfo().accumulables()) don't have a correct value as= well - should be 1 but it shows 2 'number of output rows'. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org