From issues-return-213341-archive-asf-public=cust-asf.ponee.io@spark.apache.org Mon Jan 7 11:14:05 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 49A28180647 for ; Mon, 7 Jan 2019 11:14:04 +0100 (CET) Received: (qmail 86953 invoked by uid 500); 7 Jan 2019 10:14:03 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 86944 invoked by uid 99); 7 Jan 2019 10:14:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Jan 2019 10:14:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 0A10CC0F7F for ; Mon, 7 Jan 2019 10:14:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tBAlkiuL3wu9 for ; Mon, 7 Jan 2019 10:14:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id CE9AF5F1B3 for ; Mon, 7 Jan 2019 10:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 6746AE00EA for ; Mon, 7 Jan 2019 10:14:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2D29F25546 for ; Mon, 7 Jan 2019 10:14:00 +0000 (UTC) Date: Mon, 7 Jan 2019 10:14:00 +0000 (UTC) From: "Sidhartha (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (SPARK-26558) java.util.NoSuchElementException while saving data into HDFS using Spark MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/SPARK-26558?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D167= 35619#comment-16735619 ]=20 Sidhartha edited comment on SPARK-26558 at 1/7/19 10:13 AM: ------------------------------------------------------------ Ok. I applied all the parameters as per the official documentation given in= [https://greenplum-spark.docs.pivotal.io/160/read_from_gpdb.html|http://ex= ample.com/] Jars used are same (given in the documentation), partition parameters appli= ed are also the same. was (Author: bobbysidhartha): Ok. I applied all the parameters as per the official documentation given in= [https://greenplum-spark.docs.pivotal.io/160/read_from_gpdb.html|http://ex= ample.com] > java.util.NoSuchElementException while saving data into HDFS using Spark > ------------------------------------------------------------------------ > > Key: SPARK-26558 > URL: https://issues.apache.org/jira/browse/SPARK-26558 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Submit > Affects Versions: 2.0.0 > Reporter: Sidhartha > Priority: Major > Attachments: OKVMg.png, k5EWv.png > > > h1. !OKVMg.png!!k5EWv.png! How to fix java.util.NoSuchElementException wh= ile saving data into HDFS using Spark ? > =C2=A0 > I'm trying to ingest a greenplum table into HDFS using spark-greenplum re= ader. > Below are the versions of Spark & Scala I am using: > spark-core: 2.0.0 > spark-sql: 2.0.0 > Scala version: 2.11.8 > To do that, I wrote the following code: > =C2=A0 > {code:java} > val conf =3D new SparkConf().setAppName("TEST_YEAR").set("spark.executor.= heartbeatInterval", "1200s") .set("spark.network.timeout", "12000s") .set("= spark.sql.inMemoryColumnarStorage.compressed", "true") .set("spark.shuffle.= compress", "true") .set("spark.shuffle.spill.compress", "true") .set("spark= .sql.orc.filterPushdown", "true") .set("spark.serializer", "org.apache.spar= k.serializer.KryoSerializer") .set("spark.kryoserializer.buffer.max", "512m= ") .set("spark.serializer", classOf[org.apache.spark.serializer.KryoSeriali= zer].getName) .set("spark.streaming.stopGracefullyOnShutdown", "true") .set= ("spark.yarn.driver.memoryOverhead", "8192") .set("spark.yarn.executor.memo= ryOverhead", "8192") .set("spark.sql.shuffle.partitions", "400") .set("spar= k.dynamicAllocation.enabled", "false") .set("spark.shuffle.service.enabled"= , "true") .set("spark.sql.tungsten.enabled", "true") .set("spark.executor.i= nstances", "12") .set("spark.executor.memory", "13g") .set("spark.executor.= cores", "4") .set("spark.files.maxPartitionBytes", "268435468")=20 > val flagCol =3D "del_flag" val spark =3D SparkSession.builder().config(co= nf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition"= , "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCrea= te() import spark.implicits._=20 > val dtypes =3D spark.read.format("jdbc").option("url", hiveMetaConURL).op= tion("dbtable", "(select source_type, hive_type from hivemeta.types) as gpH= iveDataTypes").option("user", metaUserName).option("password", metaPassword= ).load()=20 > val spColsDF =3D spark.read.format("jdbc").option("url", hiveMetaConURL) = .option("dbtable", "(select source_columns, precision_columns, partition_co= lumns from hivemeta.source_table where tablename=3D'gpschema.empdocs') as c= olsPrecision") .option("user", metaUserName).option("password", metaPasswor= d).load()=20 > val dataMapper =3D dtypes.as[(String, String)].collect().toMap=20 > val gpCols =3D spColsDF.select("source_columns").map(row =3D> row.getStri= ng(0)).collect.mkString(",")=20 > val gpColumns =3D gpCols.split("\\|").map(e =3D> e.split("\\:")).map(s = =3D> s(0)).mkString(",") val splitColumns =3D gpCols.split("\\|").toList=20 > val precisionCols =3D spColsDF.select("precision_columns").collect().map(= _.getString(0)).mkString(",") val partition_columns =3D spColsDF.select("pa= rtition_columns").collect.flatMap(x =3D> x.getAs[String](0).split(","))=20 > val prtn_String_columns =3D spColsDF.select("partition_columns").collect(= ).map(_.getString(0)).mkString(",") val partCList =3D prtn_String_columns.s= plit(",").map(x =3D> col(x))=20 > var splitPrecisionCols =3D precisionCols.split(",") for (i <- splitPrecis= ionCols) { precisionColsText +=3D i.concat(s"::${textType} as ").concat(s"$= {i}_text") textList +=3D s"${i}_text:${textType}" }=20 > val pCols =3D precisionColsText.mkString(",")=20 > val allColumns =3D gpColumns.concat("," + pCols)=20 > val allColumnsSeq =3D allColumns.split(",").toSeq=20 > val allColumnsSeqC =3D allColumnsSeq.map(x =3D> column(x))=20 > val gpColSeq =3D gpColumns.split(",").toSeq=20 > def prepareFinalDF(splitColumns: List[String], textList: ListBuffer[Strin= g], allColumns: String, dataMapper: Map[String, String], partition_columns:= Array[String], spark: SparkSession): DataFrame =3D {=20 > val yearDF =3D spark.read.format("io.pivotal.greenplum.spark.GreenplumRel= ationProvider").option("url", connectionUrl) .option("dbtable", "empdocs") = .option("dbschema","gpschema") .option("user", devUserName).option("passwor= d", devPassword) .option("partitionColumn","header_id") .load() .where("yea= r=3D2017 and month=3D12") .select(gpColSeq map col:_*) .withColumn(flagCol,= lit(0))=20 > val totalCols: List[String] =3D splitColumns ++ textList=20 > val allColsOrdered =3D yearDF.columns.diff(partition_columns) ++ partitio= n_columns val allCols =3D allColsOrdered.map(colname =3D> org.apache.spark.= sql.functions.col(colname))=20 > val resultDF =3D yearDF.select(allCols: _*)=20 > val stringColumns =3D resultDF.schema.fields.filter(x =3D> x.dataType =3D= =3D StringType).map(s =3D> s.name)=20 > val finalDF =3D stringColumns.foldLeft(resultDF) { (tempDF, colName) =3D>= tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\= r\n]+", " "), "[\t]+", " ")) } finalDF }=20 > val dataDF =3D prepareFinalDF(splitColumns, textList, allColumns, dataMap= per, partition_columns, spark)=20 > dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdoc= s/") } }{code} > =C2=A0 > When I submit the job, I see the tasks at below lines complete: > {code:java} > =C2=A0 > val dataMapper =3D dtypes.as[(String, String)].collect().toMap=20 > val gpCols =3D spColsDF.select("source_columns").map(row =3D> row.getStri= ng(0)).collect.mkString(",")=20 > val precisionCols =3D spColsDF.select("precision_columns").collect().map(= _.getString(0)).mkString(",") val partition_columns =3D spColsDF.select("pa= rtition_columns").collect.flatMap(x =3D> x.getAs[String](0).split(","))=20 > val prtn_String_columns =3D spColsDF.select("partition_columns").collect(= ).map(_.getString(0)).mkString(",") > =C2=A0 > {code} > =C2=A0 > Once the task of saving the prepared dataframe starts, which is: > {noformat} > dataDF.write.format("csv").save("hdfs://usrdev/apps/hive/warehouse/empdoc= s/"){noformat} > job ends with the exception: \{{}} > {noformat} > java.util.NoSuchElementException{noformat} > I am submitting the job using below spark-submit command: > {code:java} > SPARK_MAJOR_VERSION=3D2 spark-submit --class com.partition.source.YearPar= tition --master=3Dyarn --conf spark.ui.port=3D4090 --driver-class-path /hom= e/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --conf spark.jars=3D/home/h= dpdevusr/jars/greenplum-spark_2.11-1.3.0.jar --executor-cores 3 --executor-= memory 13G --keytab /home/hdpdevusr/hdpdevusr.keytab --principal hdpdevusr@= usrdev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testco= nnection.properties --name Splinter --conf spark.executor.extraClassPath=3D= /home/hdpdevusr/jars/greenplum-spark_2.11-1.3.0.jar splinter_2.11-0.1.jar{c= ode} > I see the command launches the executors as per the specified numbers in = the code which is 12 executors with 4 cores each. > Only 5 out of 48 tasks will complete and the job ends with the exception: > {code:java} > [Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task= 6.0 in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: = None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.sca= la:345) at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43) a= t io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1= $1(GreenplumRowIterator.scala:110) at io.pivotal.greenplum.spark.externalta= ble.GreenplumRowIterator.(GreenplumRowIterator.scala:109) at io.pivot= al.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49) at org.apach= e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.= rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.co= mpute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadC= heckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287= ) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:3= 8) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at or= g.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapP= artitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD= .computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterato= r(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTas= k.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.a= pache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.ut= il.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at= java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java= :617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.= SparkException: Job 5 cancelled because killed via the Web UI at org.apache= .spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$fail= JobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.schedu= ler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1457) at org.apac= he.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mc= VI$sp(DAGScheduler.scala:1446) at org.apache.spark.scheduler.DAGScheduler$$= anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1439) at org.apa= che.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(D= AGScheduler.scala:1439) at scala.collection.IndexedSeqOptimized$class.forea= ch(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt= .foreach(ArrayOps.scala:234) at org.apache.spark.scheduler.DAGScheduler.han= dleStageCancellation(DAGScheduler.scala:1439) at org.apache.spark.scheduler= .DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1701) at org.a= pache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.s= cala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onRec= eive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$$anon$1.ru= n(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAG= Scheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.s= cala:2029) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$= anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186) ... 44 more 18/12/= 27 10:30:53 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout, java.= util.concurrent.TimeoutException java.util.concurrent.TimeoutException at j= ava.util.concurrent.FutureTask.get(FutureTask.java:205) at org.apache.hadoo= p.util.ShutdownHookManager$1.run(ShutdownHookManager.java:67) 18/12/27 10:3= 0:53 ERROR Utils: Uncaught exception in thread pool-6-thread-1 java.lang.In= terruptedException at java.lang.Object.wait(Native Method) at java.lang.Thr= ead.join(Thread.java:1249) at java.lang.Thread.join(Thread.java:1323) at or= g.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:199) at= org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.sc= ala:1919) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1= 317) at org.apache.spark.SparkContext.stop(SparkContext.scala:1918) at org.= apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581) a= t org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216= ) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anon= fun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188) at org.apach= e.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$= sp$1.apply(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShu= tdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHo= okManager.scala:188) at org.apache.spark.util.Utils$.logUncaughtExceptions(= Utils.scala:1948) at org.apache.spark.util.SparkShutdownHookManager$$anonfu= n$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188) at org.apache.spark.= util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.s= cala:188) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll= $1.apply(ShutdownHookManager.scala:188) at scala.util.Try$.apply(Try.scala:= 192) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookM= anager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2= .run(ShutdownHookManager.scala:178) at java.util.concurrent.Executors$Runna= bleAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(= FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(T= hreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Wor= ker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:74= 5){code} > =C2=A0 > I don't understand where did it go wrong whether in code or in any config= uration applied in the job. > I posted the same on Stackoverflow as well. For executor images, the belo= w link can be referred:[ > [https://stackoverflow.com/questions/54002002/how-to-fix-java-util-nosuc= helementexception-while-saving-data-into-hdfs-using-sp/54002423?noredirect= =3D1#comment94843141_54002423]|http://example.com] > Could anyone let me know how to fix this exception ? > =C2=A0 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org