Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CAB24200BD8 for ; Wed, 7 Dec 2016 11:19:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C9522160B0A; Wed, 7 Dec 2016 10:19:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 79957160AFD for ; Wed, 7 Dec 2016 11:19:14 +0100 (CET) Received: (qmail 15267 invoked by uid 500); 7 Dec 2016 10:19:12 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 15257 invoked by uid 99); 7 Dec 2016 10:19:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 10:19:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E29D7C120E for ; Wed, 7 Dec 2016 10:19:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 6.129 X-Spam-Level: ****** X-Spam-Status: No, score=6.129 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLACK=4] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id iu2neLA-6hWt for ; Wed, 7 Dec 2016 10:19:10 +0000 (UTC) Received: from mail-yw0-f170.google.com (mail-yw0-f170.google.com [209.85.161.170]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id D489D5FB59 for ; Wed, 7 Dec 2016 10:19:09 +0000 (UTC) Received: by mail-yw0-f170.google.com with SMTP id t125so295643595ywc.1 for ; Wed, 07 Dec 2016 02:19:09 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=Imf3luFIrewR3yWg+8ju9xUK5a7iC2Vt3hRXbFGku1Q=; b=fEqEzSuA3swE9qoMMkOcoa6LFNc5INQV+DzWWlTDZn7tyrXwJaiUjt6QpvakB6LCKh n8Dh/W/f5wWWYhSpOHMJWHfSoIhe8uKm3ibRIh1qNSGLIR17jta5dP/pIJ+IWMsT5vm5 OIZNFt/9Lez8ICBEijEt5bxsnZa6xYFviFvSzY3Tl2fyb/r8TFYnz+Qms+jGlsdwiFf/ ElRaFI+BsM826TFjPo9asvcAAOj15Ld5mIFzXvxkyrk+1cXHj0hDmEUhYjROAfmSuWEA dLY+Qin0ZwBg1GvBOViIkz/0mflIhHL36GPzy1uE71bjytSDC64ra+3XuZWaXBrd1qbP kgPA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=Imf3luFIrewR3yWg+8ju9xUK5a7iC2Vt3hRXbFGku1Q=; b=laeTeN+D6ZZ8GxkeqPWEHhR4tT2KPcuXTHuDmWLXEwJeo5rOrohXRIZRNFs5Fc99Am f8kbysrc0NJh/h0gJkty5gbZRjdTtbU8FkhFdwtPCeFQB3qVOL/8JD9pjOr1GSJCK/Wb BejhiXx1wbNSqVOUh1Jv+KuCWEQCMPEFWg9j+UXl3PcRI8puMGgZD17sU+qN4qPIXv3K qCChwlKY2C4CpkU3xeip7ebvYKWlch1o3XkKQi6FujP/A6byhUfaDZYF98dYTuiiRsop K0q4SVZsaJc1Pg0wcRXz3fxHaCPU9b895GsBZzwSiqrGgzz/UkOZhG0KBy7gn3Koxfjx 1qaA== X-Gm-Message-State: AKaTC03URndXhqDRx9XOQ1RDyWRNU9V1+wGAySBGWuNPeXOwo2QLBKBHXV6LGXAwJp/b1YLvR4umZhrnTjhv2Q== X-Received: by 10.37.25.85 with SMTP id 82mr29614782ybz.150.1481105945724; Wed, 07 Dec 2016 02:19:05 -0800 (PST) MIME-Version: 1.0 Received: by 10.37.72.67 with HTTP; Wed, 7 Dec 2016 02:19:05 -0800 (PST) In-Reply-To: References: <0ec50ca5b3121be333876ab43b19eab8@mail.gmail.com> From: Hyukjin Kwon Date: Wed, 7 Dec 2016 19:19:05 +0900 Message-ID: Subject: Re: get corrupted rows using columnNameOfCorruptRecord To: Yehuda Finkelstein Cc: Michael Armbrust , user Content-Type: multipart/alternative; boundary=001a113ecaa8c5113f05430ed9c0 archived-at: Wed, 07 Dec 2016 10:19:16 -0000 --001a113ecaa8c5113f05430ed9c0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Let me please just extend the suggestion a bit more verbosely. I think you could try something like this maybe. val jsonDF =3D spark.read .option("columnNameOfCorruptRecord", "xxx") .option("mode","PERMISSIVE") .schema(StructType(schema.fields :+ StructField("xxx", StringType, true))= ) .json(corruptRecords) val malformed =3D jsonDF.filter("xxx is not null").select("xxx") malformed.show() This prints something like the ones below: +------------+ | xxx| +------------+ | {| |{"a":1, b:2}| |{"a":{, b:3}| | ]| +------------+ =E2=80=8B If the schema is not specified, then the inferred schema has the malformed column automatically but in case of specifying the schema, I believe this should be manually set= . 2016-12-07 18:06 GMT+09:00 Yehuda Finkelstein : > Hi > > > > I tried it already but it say that this column doesn=E2=80=99t exists. > > > > scala> var df =3D spark.sqlContext.read. > > | option("columnNameOfCorruptRecord","xxx"). > > | option("mode","PERMISSIVE"). > > | schema(df_schema.schema).json(f) > > df: org.apache.spark.sql.DataFrame =3D [auctionid: string, timestamp: str= ing > ... 37 more fields] > > > > scala> df.select > > select selectExpr > > > > scala> df.select("xxx").show > > org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given > input columns: [=E2=80=A6];; > > > > at org.apache.spark.sql.catalyst.analysis.package$ > AnalysisErrorAt.failAnalysis(package.scala:42) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse( > CheckAnalysis.scala:77) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse( > CheckAnalysis.scala:74) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformUp$1.apply(TreeNode.scala:308) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformUp$1.apply(TreeNode.scala:308) > > at org.apache.spark.sql.catalyst.trees.CurrentOrigin$. > withOrigin(TreeNode.scala:69) > > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp( > TreeNode.scala:307) > > at org.apache.spark.sql.catalyst.plans.QueryPlan. > transformExpressionUp$1(QueryPlan.scala:269) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$ > spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$ > 2(QueryPlan.scala:279) > > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$ > apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply( > QueryPlan.scala:283) > > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234= ) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$ > spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$ > 2(QueryPlan.scala:283) > > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8. > apply(QueryPlan.scala:288) > > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:186) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp= ( > QueryPlan.scala:288) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:126) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class. > checkAnalysis(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.analysis.Analyzer. > checkAnalysis(Analyzer.scala:58) > > at org.apache.spark.sql.execution.QueryExecution. > assertAnalyzed(QueryExecution.scala:49) > > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > > at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$ > withPlan(Dataset.scala:2603) > > at org.apache.spark.sql.Dataset.select(Dataset.scala:969) > > at org.apache.spark.sql.Dataset.select(Dataset.scala:987) > > ... 48 elided > > > > scala> > > > > > > *From:* Michael Armbrust [mailto:michael@databricks.com] > *Sent:* Tuesday, December 06, 2016 10:26 PM > *To:* Yehuda Finkelstein > *Cc:* user > *Subject:* Re: get corrupted rows using columnNameOfCorruptRecord > > > > .where("xxx IS NOT NULL") will give you the rows that couldn't be parsed. > > > > On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein < > yehuda@veracity-group.com> wrote: > > Hi all > > > > I=E2=80=99m trying to parse json using existing schema and got rows with = NULL=E2=80=99s > > //get schema > > val df_schema =3D spark.sqlContext.sql("select c1,c2,=E2=80=A6cn t1 limi= t 1") > > //read json file > > val f =3D sc.textFile("/tmp/x") > > //load json into data frame using schema > > var df =3D spark.sqlContext.read.option("columnNameOfCorruptRecord"," > xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f) > > > > in documentation it say that you can query the corrupted rows by this > columns =C3=A0 columnNameOfCorruptRecord > > o =E2=80=9CcolumnNameOfCorruptRecord (default is the value specified i= n > spark.sql.columnNameOfCorruptRecord): allows renaming the new field > having malformed string created by PERMISSIVE mode. This overrides > spark.sql.columnNameOfCorruptRecord.=E2=80=9D > > > > The question is how to fetch those corrupted rows ? > > > > > > Thanks > > Yehuda > > > > > > > --001a113ecaa8c5113f05430ed9c0 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Let me please just extend the suggestion a bit more verb= osely.

I think you could try something= like this maybe.

val json=
DF =3D spark.read
  .option("columnNameOfCorruptRecord", "xxx")
  .option("mode","PERMISSIVE")
  .schema(StructType(schema.fields :+ StructField("xxx", StringTy=
pe, true)))
  .json(corruptRecords)
val malformed =3D jsonDF.filter("xxx is not null").select("x=
xx")
malformed.show()

This prints something like t= he ones below:

+-------=
-----+
|         xxx|
+------------+
|           {|
|{"a":1, b:2}|
|{"a":{, b:3}|
|           ]|
+------------+
=E2=80=8B

If the schema is not specified, then the inferre= d schema has the malformed column automatically

bu= t in case of specifying the schema, I believe this should be manually set.<= /div>




2016-12-07 18:06 GMT+09:00 Yehuda Finkelst= ein <yehuda@veracity-group.com>:

Hi =

=C2=A0

= I tried it already but it say that this column doesn=E2=80=99t exis= ts.

=C2=A0

scala> var df =3D spark.sqlContext.read.

=C2=A0=C2=A0=C2=A0=C2=A0 | option("co= lumnNameOfCorruptRecord","xxx").

=C2=A0=C2=A0=C2=A0=C2=A0 | option("mode","= ;PERMISSIVE").

=C2=A0=C2=A0= =C2=A0=C2=A0 | schema(df_schema.schema).json(f)

df: org.apache.spark.sql.DataFrame =3D [auctionid: strin= g, timestamp: string ... 37 more fields]

<= span style=3D"font-size:11pt;font-family:calibri,sans-serif;color:rgb(31,73= ,125)">=C2=A0

scala> df.select<= /span>

select=C2=A0=C2=A0 selectExpr

=C2=A0

scala> df.select("xxx").show

org.apache.spark.sql.AnalysisException: cannot resol= ve '`xxx`' given input columns: [=E2=80=A6];;

=C2=A0

=C2=A0 = at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt= .failAnalysis(package.scala:42)

=C2=A0 at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnaly= sis.scala:77)

=C2=A0 at org.apache= .spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis= $1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)

=C2=A0 at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)=

=C2=A0 at org.apache.spark.sql.ca= talyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.= scala:308)

=C2=A0 at org.apache.sp= ark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:6= 9)

=C2=A0 at org.apache.spark.sql.= catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)

=C2=A0 at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)

=C2=A0 at org.apache.spa= rk.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst= $plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)

=C2=A0 at org.apache.spark.sql.catalyst= .plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plan= s$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)

=C2=A0 at scala.collection.Trav= ersableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

=C2=A0 at scala.collection.Travers= ableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

=C2=A0 at scala.collection.mutable.Re= sizableArray$class.foreach(ResizableArray.scala:59)

=C2=A0 at scala.collection.mutable.ArrayBuffer.= foreach(ArrayBuffer.scala:48)

=C2=A0 at scala.collection.TraversableLike$class.map(Traversable= Like.scala:234)

=C2=A0 at scala.co= llection.AbstractTraversable.map(Traversable.scala:104)

=C2=A0 at org.apache.spark.sql.= catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$= QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)

=C2=A0 at org.apache.spark.sql.catalyst.p= lans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)

=C2=A0 at org.apache.spark.sql.catalyst.trees.= TreeNode.mapProductIterator(TreeNode.scala:186)

=C2=A0 at org.apache.spark.sql.catalyst.plans.= QueryPlan.transformExpressionsUp(QueryPlan.scala:288)

<= p class=3D"MsoNormal">=C2=A0 at org.apache.spark.sql.catalyst.= analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalys= is.scala:74)

=C2=A0 at org.apache.= spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$= 1.apply(CheckAnalysis.scala:67)

=C2=A0 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)

=C2=A0 at o= rg.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAn= alysis(CheckAnalysis.scala:67)

=C2=A0 at org.apache.spark.sql.catalyst.analysis.Analyzer.check= Analysis(Analyzer.scala:58)

= =C2=A0 at org.apache.spark.sql.execution.QueryExecution.assertAna= lyzed(QueryExecution.scala:49)

=C2=A0 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

=C2=A0 at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)

=C2=A0 at org.apache.spark.sql.Dataset.= select(Dataset.scala:969)

=C2= =A0 at org.apache.spark.sql.Dataset.select(Dataset.scala:987)

=C2=A0 ... 48 elided

=C2=A0

scala&g= t;

=C2=A0

=C2=A0

From: Michael Armbrust [mailto= :michael@databr= icks.com]
Sent: Tuesday, December 06, 2016 10:26 PM
= To: Yehuda Finkelstein
Cc: user
Subject: Re: get= corrupted rows using columnNameOfCorruptRecord

=C2=A0

= .where("xxx IS NOT NULL") will give you the rows that couldn'= t be parsed.

=C2=A0

On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <yehuda@veracity-g= roup.com> wrote:

Hi all

=C2=A0

I=E2=80=99m trying to p= arse json using existing schema and got rows with NULL=E2=80=99s

//get schema

val df_schema =3D s= park.sqlContext.sql("select c1,c2,=E2=80=A6cn t1 =C2=A0limit 1")<= /p>

//read json file

val f= =3D sc.textFile("/tmp/x")

//load json = into data frame using schema

var df =3D spark.sql= Context.read.option("columnNameOfCorruptRecord","x= xx").option("mode","PERMISSIVE").schema(df_sc= hema.schema).json(f)

=C2=A0

in documentation it say that you can query the corrupted rows by = this columns =C3=A0 columnName= OfCorruptRecord

o=C2=A0=C2=A0=C2=A0 = =E2=80=9CcolumnNameOfCorruptRecord=C2=A0(default is the value specified inspark.sql.columnNameOfCorruptRecord): allows renaming the new fiel= d having malformed string created by=C2=A0PERMISSIVE=C2=A0mode. This overrides=C2=A0spark.sql.columnNameOfCorruptRecord.=E2=80=9D

=C2=A0<= /p>

The question is how to fetch those corrupted rows= ?

=C2=A0

=C2=A0

Thanks

Yehuda

=C2=A0

=C2=A0

=C2=A0


--001a113ecaa8c5113f05430ed9c0--