Return-Path: X-Original-To: apmail-spark-dev-archive@minotaur.apache.org Delivered-To: apmail-spark-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C0A0917D2C for ; Fri, 25 Sep 2015 05:13:35 +0000 (UTC) Received: (qmail 50696 invoked by uid 500); 25 Sep 2015 05:13:34 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 50599 invoked by uid 500); 25 Sep 2015 05:13:34 -0000 Mailing-List: contact dev-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list dev@spark.apache.org Received: (qmail 50588 invoked by uid 99); 25 Sep 2015 05:13:33 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2015 05:13:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 57D7BC0E34 for ; Fri, 25 Sep 2015 05:13:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.899 X-Spam-Level: ** X-Spam-Status: No, score=2.899 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id w1z5oyT-4Oks for ; Fri, 25 Sep 2015 05:13:23 +0000 (UTC) Received: from mail-io0-f182.google.com (mail-io0-f182.google.com [209.85.223.182]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 69C7E42B30 for ; Fri, 25 Sep 2015 05:13:23 +0000 (UTC) Received: by iofh134 with SMTP id h134so99838984iof.0 for ; Thu, 24 Sep 2015 22:13:23 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc:content-type; bh=/eXDcHr8hxBMmCFkxdrU7B+UhrUrM+PJBwUWT5eFNko=; b=ggwmfLxgXOFu1MLsukWOqQV111J3Sz+xyVVD+dXMdJkCil3jxazATNCVa5RvwblqPm 7gFe3o1GzWpQZ4BJKPmh3aNl8B4KfVgf3ExZqbfKyjH17r1qcJtj9D71BjpWBJpRuFZx fphFyeHEXKM33JNULCDgNaOwNNNcy+bsJKj2fxMxDac+kd1kczKjCaNLrAsYYi8s/dUN sxpRJIUSzKm4MHVVc3Ud4+smnjIRNtH1WnUTLwMNKU9C9z9KRTdlzeDTDcMRphoPCRMF ceFyQSnvUwNTpC3AY+k0BJRLrnPcVEmlit71R3AtUzstxbVBCn+zvOALdKWQDT9svNFE K/BA== X-Received: by 10.107.4.82 with SMTP id 79mr4333382ioe.10.1443158003003; Thu, 24 Sep 2015 22:13:23 -0700 (PDT) MIME-Version: 1.0 Received: by 10.79.68.65 with HTTP; Thu, 24 Sep 2015 22:12:43 -0700 (PDT) In-Reply-To: References: <3344CCDF-EB5A-4D5D-8FD5-8B637E837784@everstring.com> From: Anchit Choudhry Date: Fri, 25 Sep 2015 01:12:43 -0400 Message-ID: Subject: Re: How to get the HDFS path for each RDD To: Fengdong Yu Cc: dev@spark.apache.org Content-Type: multipart/alternative; boundary=001a113efc821fb80805208b686e --001a113efc821fb80805208b686e Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Fengdong, So I created two files in HDFS under a test folder. test/dt=3D20100101.json { "key1" : "value1" } test/dt=3D20100102.json { "key2" : "value2" } Then inside PySpark shell rdd =3D sc.wholeTextFiles('./test/*') rdd.collect() [(u'hdfs://localhost:9000/user/hduser/test/dt=3D20100101.json', u'{ "key1" = : "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=3D20100102.json', u'{ "key2" : "value2" })] import json def editMe(y, x): j =3D json.loads(y) j['source'] =3D x return j rdd.map(lambda (x,y): editMe(y,x)).collect() [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=3D20100101.json', u'key1': u'value1'}, {u'key2': u'value2', 'source': u'hdfs://localhost :9000/user/hduser/test/dt=3D20100102.json'}] Similarly you could modify the function to return 'source' and 'date' with some string manipulation per your requirements. Let me know if this helps. Thanks, Anchit On 24 September 2015 at 23:55, Fengdong Yu wrote= : > > yes. such as I have two data sets: > > date set A: /data/test1/dt=3D20100101 > data set B: /data/test2/dt=3D20100202 > > > all data has the same JSON format , such as: > {=E2=80=9Ckey1=E2=80=9D : =E2=80=9Cvalue1=E2=80=9D, =E2=80=9Ckey2=E2=80= =9D : =E2=80=9Cvalue2=E2=80=9D } > > > my output expected: > {=E2=80=9Ckey1=E2=80=9D : =E2=80=9Cvalue1=E2=80=9D, =E2=80=9Ckey2=E2=80= =9D : =E2=80=9Cvalue2=E2=80=9D , =E2=80=9Csource=E2=80=9D : =E2=80=9Ctest1= =E2=80=9D, =E2=80=9Cdate=E2=80=9D : > =E2=80=9C20100101"} > {=E2=80=9Ckey1=E2=80=9D : =E2=80=9Cvalue1=E2=80=9D, =E2=80=9Ckey2=E2=80= =9D : =E2=80=9Cvalue2=E2=80=9D , =E2=80=9Csource=E2=80=9D : =E2=80=9Ctest2= =E2=80=9D, =E2=80=9Cdate=E2=80=9D : > =E2=80=9C20100202"} > > > On Sep 25, 2015, at 11:52, Anchit Choudhry > wrote: > > Sure. May I ask for a sample input(could be just few lines) and the outpu= t > you are expecting to bring clarity to my thoughts? > > On Thu, Sep 24, 2015, 23:44 Fengdong Yu wrote: > >> Hi Anchit, >> >> Thanks for the quick answer. >> >> my exact question is : I want to add HDFS location into each line in my >> JSON data. >> >> >> >> On Sep 25, 2015, at 11:25, Anchit Choudhry >> wrote: >> >> Hi Fengdong, >> >> Thanks for your question. >> >> Spark already has a function called wholeTextFiles within sparkContext >> which can help you with that: >> >> Python >> >> hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001 >> ...hdfs://a-hdfs-path/part-nnnnn >> >> rdd =3D sparkContext.wholeTextFiles(=E2=80=9Chdfs://a-hdfs-path=E2=80=9D= ) >> >> (a-hdfs-path/part-00000, its content) >> (a-hdfs-path/part-00001, its content) >> ... >> (a-hdfs-path/part-nnnnn, its content) >> >> More info: http://spark.apache.org/docs/latest/api/python/pyspark >> .html?highlight=3Dwholetext#pyspark.SparkContext.wholeTextFiles >> >> ------------ >> >> Scala >> >> val rdd =3D sparkContext.wholeTextFile("hdfs://a-hdfs-path") >> >> More info: https://spark.apache.org/docs/latest/api/scala/index.html#org= . >> apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)= ] >> >> Let us know if this helps or you need more help. >> >> Thanks, >> Anchit Choudhry >> >> On 24 September 2015 at 23:12, Fengdong Yu >> wrote: >> >>> Hi, >>> >>> I have multiple files with JSON format, such as: >>> >>> /data/test1_data/sub100/test.data >>> /data/test2_data/sub200/test.data >>> >>> >>> I can sc.textFile(=E2=80=9C/data/*/*=E2=80=9D) >>> >>> but I want to add the {=E2=80=9Csource=E2=80=9D : =E2=80=9CHDFS_LOCATIO= N=E2=80=9D} to each line, then >>> save it the one target HDFS location. >>> >>> how to do it, Thanks. >>> >>> >>> >>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org >>> For additional commands, e-mail: dev-help@spark.apache.org >>> >>> >> >> > --001a113efc821fb80805208b686e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Fengdong= ,

So I created two files in HDFS under a test folder.

=
test/dt=3D= 20100101.json
{ "key1" : "value1" }

test/= dt=3D20100102.json
{ "key2&= quot; : "value2" }

Then inside PySpark shell

rdd =3D sc.wholeTextFiles('./test/*')
rdd.collect()
[(u'hdfs:= //localhost:9000/use= r/hduser/test/dt=3D20100101.json', u= '{ "key1" : "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=3D20100102.json', u'{ "key2" : "va= lue2" })]
import json
def editMe(y, x):
=C2=A0 =C2=A0 =C2=A0 j =3D json.loads(y)
=C2= =A0 =C2=A0 =C2=A0 j['source'] =3D x
=C2=A0 =C2=A0 =C2=A0 = return j

rdd.map(lambda (x,y): editMe(y,x)).collect()
[{'source': <= span class=3D"" id=3D":1bd.25" tabindex=3D"-1">u'hdfs://localhost:9000/user/hduser/test/dt=3D20100101.json', u'key1&= #39;: u'value1'}, {u'key2': u'value2', 'source&= #39;: u'hdfs://<= span class=3D"" id=3D":1bd.30" tabindex=3D"-1">localhost:9000/user/<= span class=3D"" id=3D":1bd.31" tabindex=3D"-1">hduser/test/dt=3D20100102.json'}]
=

Similarly you could modify the function to return= 'source' and 'date' with some string manipulation per your= requirements.

Let me know if this helps.

Thanks,
Anchit

=
On 24 September 2015 at 23:55, Fengdong Yu <= span dir=3D"ltr"><fengdongy@everstring.com> wrote:

yes. such a= s I have two data sets:

date set A: /data/test1/dt= =3D20100101
data set B: /data/test2/dt=3D20100202

<= /div>

all data has the same JSON format , such as:
=
{=E2=80=9Ckey1=E2=80=9D : =E2=80=9Cvalue1=E2=80=9D, =E2=80=9Ckey2=E2= =80=9D : =E2=80=9Cvalue2=E2=80=9D }


my output expected:
{=E2=80=9Ckey1=E2=80=9D : =E2=80=9Cvalue1=E2= =80=9D, =E2=80=9Ckey2=E2=80=9D : =E2=80=9Cvalue2=E2=80=9D , =E2=80=9Csource=E2=80=9D : =E2=80=9Ctest1=E2=80=9D, =E2=80=9Cdate= =E2=80=9D : =E2=80=9C20100101"}
{=E2=80=9Ckey1=E2=80= =9D : =E2=80=9Cvalue1=E2=80=9D, =E2=80=9Ckey2=E2=80=9D : =E2=80=9Cvalue2=E2= =80=9D , =E2=80=9Csource=E2=80=9D : =E2=80=9Ctest2= =E2=80=9D, =E2=80=9Cdate=E2=80=9D : =E2=80=9C20100202"}

=

On Sep 25, 2015, at 11:52, Anchit C= houdhry <= anchit.choudhry@gmail.com> wrote:

Sure.= May I ask for a sample input(could be just few lines) and the output you a= re expecting to bring clarity to my thoughts?


On Thu, Sep 24, 2015, 23:44= =C2=A0Fengdong Yu <fengdongy@everstring.com> wrote:
Hi Anchit,=C2=A0

=
Thanks for the quick answer.

my exact q= uestion is : I want to add HDFS location into each line in my JSON =C2=A0da= ta.


<= /div>

On Sep 25, 2015, at 11:25, Anchit C= houdhry <= anchit.choudhry@gmail.com> wrote:

Hi = Fengdong,

Thanks for your question.

Spark already has a function called wholeTextFiles<= /span> within sparkContext which can help you with that:
=

Python
hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn
<= /div>
rdd =
=3D sparkContext.wholeText=
Files(=E2=80=9Chdfs://a-hdfs-path=E2=80=9D)
<= /pre>
(a-hdf=
s-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)
=
More info:=C2=A0http://spa= rk.apache.org/docs/latest/api/python/pyspark.= html?highlight=3Dwholetext#pyspark.SparkCon= text.wholeTextFiles

----------= --

Scala

val rdd= =3D sparkContext.wholeTextFile("hdfs<= /span>://a-hdfs-path")

More i= nfo:=C2=A0https://spark.apache= .org/docs/latest/api/scala/index.html#org.apache.= spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)]
=C2=A0
Let= us know if this helps or you need more help.

Than= ks,
Anchit Choudhry

On 24 September 2015 at 2= 3:12, Fengdong Yu <fengdongy@everstring.com> wrote:
Hi,

I have=C2=A0 multiple files with JSON format, such as:

/data/test1_data/sub100/test.data
/data/test2_data/sub200/test.data


I can sc.textFile(=E2=80=9C/data/*/*=E2=80=9D)

but I want to add the {=E2=80=9Csource=E2=80=9D : =E2=80=9CHDFS_LOCATION=E2= =80=9D} to each line, then save it the one target HDFS location.

how to do it, Thanks.






---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org





--001a113efc821fb80805208b686e--