Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6EA1817904 for ; Thu, 16 Apr 2015 18:41:15 +0000 (UTC) Received: (qmail 23620 invoked by uid 500); 16 Apr 2015 18:41:12 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 23540 invoked by uid 500); 16 Apr 2015 18:41:12 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 23530 invoked by uid 99); 16 Apr 2015 18:41:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 18:41:12 +0000 X-ASF-Spam-Status: No, hits=1.7 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gangele397@gmail.com designates 209.85.217.181 as permitted sender) Received: from [209.85.217.181] (HELO mail-lb0-f181.google.com) (209.85.217.181) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 18:40:46 +0000 Received: by lbbqq2 with SMTP id qq2so65488983lbb.3 for ; Thu, 16 Apr 2015 11:40:45 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=L3Qz+g+5OqVLGUh+/fOIA8J8w/wAwmDVAo8IXc5XZ6E=; b=Zx8fNKdcu3b1RVLBEDyY0bo5/2mR3C7VsQirsxWSstrGdMS1nD1RLXhfraHytzH75k 15nHoFwxRspgSHkzd9bX3uGurrUCpy07/pMtjRCDDvUDxBoA8Zs8OQ3zqOYa/fJksbeN av7CsuVm4W4fBefsqEC63E9XCoNTzZvTCL1kdo1676BKTjtywysQa+SvGFuqVsrx7f3O lXoiixXBfEBYVar/jZ0wTruRT/ccwjY3JvDRr6auJ1edmQE7ZGTktWsi9gqODmxnNguV wWee6hJpm5nLNZpeWZEajLkiEGJL1Ktn4B8Ho9dNtsFbVxdTZ1Te6GKNsauj0BrAy/nu aO8A== MIME-Version: 1.0 X-Received: by 10.112.171.101 with SMTP id at5mr29917589lbc.66.1429209645343; Thu, 16 Apr 2015 11:40:45 -0700 (PDT) Received: by 10.25.210.207 with HTTP; Thu, 16 Apr 2015 11:40:45 -0700 (PDT) In-Reply-To: References: Date: Fri, 17 Apr 2015 00:10:45 +0530 Message-ID: Subject: Re: Distinct is very slow From: Jeetendra Gangele To: Akhil Das , user Content-Type: multipart/alternative; boundary=001a11c38be23869b80513dbcd49 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c38be23869b80513dbcd49 Content-Type: text/plain; charset=ISO-8859-1 at distinct level I will have 7000 times more elements in my RDD.So should I re partition? because its parent will definitely have less partition how to see through java code number of partition? On 16 April 2015 at 23:07, Jeetendra Gangele wrote: > No I did not tried the partitioning below is the full code > > public static void matchAndMerge(JavaRDD > matchRdd,JavaSparkContext jsc) throws IOException{ > long start = System.currentTimeMillis(); > JavaPairRDD RddForMarch > =matchRdd.zipWithIndex().mapToPair(new > PairFunction, Long, MatcherReleventData>() { > > @Override > public Tuple2 call(Tuple2 t) > throws Exception { > MatcherReleventData matcherData = new MatcherReleventData(); > Tuple2 tuple = new Tuple2 MatcherReleventData>(t._2, > matcherData.convertVendorDataToMatcherData(t._1)); > return tuple; > } > > }).cache(); > log.info("after index"+RddForMarch.take(1)); > Map tmp =RddForMarch.collectAsMap(); > Map matchData = new HashMap MatcherReleventData>(tmp); > final Broadcast> dataMatchGlobal = > jsc.broadcast(matchData); > > JavaPairRDD blockingRdd = RddForMarch.flatMapValues(new > Function>(){ > > @Override > public Iterable call(MatcherReleventData v1) > throws Exception { > List values = new ArrayList(); > HelperUtilities helper1 = new HelperUtilities(); > MatcherKeys matchkeys=helper1.getBlockinkeys(v1); > if(matchkeys.get_companyName() !=null){ > values.add(matchkeys.get_companyName()); > } > if(matchkeys.get_phoneNumberr() !=null){ > values.add(matchkeys.get_phoneNumberr()); > } > if(matchkeys.get_zipCode() !=null){ > values.add(matchkeys.get_zipCode()); > } > if(matchkeys.getM_domain() !=null){ > values.add(matchkeys.getM_domain()); > } > return values; > } > }); > log.info("blocking RDD is"+blockingRdd.count()); > int count=0; > log.info("Starting printing"); > for (Tuple2 entry : blockingRdd.collect()) { > > log.info(entry._1() + ":" + entry._2()); > count++; > } > log.info("total count"+count); > JavaPairRDD > completeDataToprocess=blockingRdd.flatMapValues( new Function Iterable>(){ > > @Override > public Iterable call(String v1) throws Exception { > return ckdao.getSingelkeyresult(v1); > } > }).distinct(32); > log.info("after hbase count is"+completeDataToprocess.count()); > log.info("data for process"+completeDataToprocess.take(1)); > JavaPairRDD> withScore > =completeDataToprocess.mapToPair( new PairFunction, > Long, Tuple2>(){ > > @Override > public Tuple2> call(Tuple2 t) > throws Exception { > Scoring scoreObj = new Scoring(); > double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), > dataMatchGlobal.getValue().get(t._1())); > Tuple2 maptuple = new Tuple2(t._2(), > score); > Tuple2> tuple = new Tuple2 Tuple2>(t._1(), maptuple); > return tuple; > } > }); > log.info("with score tuple is"+withScore.take(1)); > JavaPairRDD> maxScoreRDD > =withScore.reduceByKey( new Function2, > Tuple2, Tuple2>(){ > > @Override > public Tuple2 call(Tuple2 v1, > Tuple2 v2) throws Exception { > int res =v1._2().compareTo(v2._2()); > if(res >0){ > Tuple2 result = new Tuple2(v1._1(), > v1._2()); > return result; > } > else if(res<0){ > Tuple2 result = new Tuple2(v2._1(), > v2._2()); > return result; > } > else{ > Tuple2 result = new Tuple2(v2._1(), > v2._2()); > return result; > } > } > }); > log.info("max score RDD"+maxScoreRDD.take(10)); > > maxScoreRDD.foreach( new > VoidFunction>>(){ > > @Override > public void call(Tuple2> t) > throws Exception { > MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); > log.info("broadcast is"+dataMatchGlobal.getValue().get(t._1())); > //Set the score for better understanding of merge > matchedData.setScore(t._2()._2()); > vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"); > } > }); > log.info("took " + (System.currentTimeMillis() - start) + " mills to run > matcher"); > > > > } > > > On 16 April 2015 at 22:25, Akhil Das wrote: > >> Can you paste your complete code? Did you try repartioning/increasing >> level of parallelism to speed up the processing. Since you have 16 cores, >> and I'm assuming your 400k records isn't bigger than a 10G dataset. >> >> Thanks >> Best Regards >> >> On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele > > wrote: >> >>> I already checked and G is taking 1 secs for each task. is this too >>> much? if yes how to avoid this? >>> >>> >>> On 16 April 2015 at 21:58, Akhil Das wrote: >>> >>>> Open the driver ui and see which stage is taking time, you can look >>>> whether its adding any GC time etc. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele < >>>> gangele397@gmail.com> wrote: >>>> >>>>> Hi All I have below code whether distinct is running for more time. >>>>> >>>>> blockingRdd is the combination of and it will have 400K >>>>> records >>>>> JavaPairRDD >>>>> completeDataToprocess=blockingRdd.flatMapValues( new Function>>>> Iterable>(){ >>>>> >>>>> @Override >>>>> public Iterable call(String v1) throws Exception { >>>>> return ckdao.getSingelkeyresult(v1); >>>>> } >>>>> }).distinct(32); >>>>> >>>>> I am running distinct on 800K records and its taking 2 hours on 16 >>>>> cores and 20 GB RAM. >>>>> >>>> >>>> >>> >>> >>> >>> >> > > > > --001a11c38be23869b80513dbcd49 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
at distinct level I will have 7000 times more elements in = my RDD.So should I re partition? because its parent will definitely have le= ss partition how to see through java code number of partition?=A0

<= div class=3D"gmail_extra">
On 16 April 2015 a= t 23:07, Jeetendra Gangele <gangele397@gmail.com> wrote:<= br>
No I did not tried the partitioning below is the full co= de

public static =A0void =A0matchAndMerge(J= avaRDD<VendorRecord> matchRdd,JavaSparkContext jsc) throws IOExceptio= n{
long start =3D System.currentTimeM= illis();
JavaPairRDD<Long, MatcherReleventData>= RddForMarch =3DmatchRdd.zipWithIndex().mapToPair(new PairFunction<Tuple= 2<VendorRecord,Long>, Long, MatcherReleventData>() {
@Override
<= div> public Tuple2<Long, M= atcherReleventData> call(Tuple2<VendorRecord, Long> t)
<= span style=3D"white-space:pre-wrap"> throws Exception {
MatcherReleventData match= erData =3D new MatcherReleventData();
Tuple2<Long, MatcherReleventData> tuple =3D new = Tuple2<Long, MatcherReleventData>(t._2, matcherData.convertVendorData= ToMatcherData(t._1));
=
return = tuple;
}

}).cache();
log.info("after index"+RddForMarch.take(1));
Map<Long, MatcherReleventData> tmp =3DR= ddForMarch.collectAsMap();
= Map<Long, MatcherReleventData> matchData =3D new HashMap<L= ong, MatcherReleventData>(tmp);
final Broadcast<Map<Long, MatcherReleventData>>= dataMatchGlobal =3D jsc.broadcast(matchData);

JavaPairRDD<Long,String> blockingRdd =3D Rdd= ForMarch.flatMapValues(new Function<MatcherReleventData, Iterable<Str= ing>>(){

@Override
public Iterable<String> call(MatcherReleventData v1)
throws Exception {
List<String> values = =3D new ArrayList<String>();
HelperUtilities helper1 =3D new HelperUtilities();
<= div> MatcherKeys matchkeys= =3Dhelper1.getBlockinkeys(v1);
if(matchkeys.get_companyName() !=3Dnull){
values.add(matchkeys.get_companyNam= e());
}
if(matchkeys.get_phoneNumb= err() !=3Dnull){
values.add(matchkeys.get_phoneNumberr());
}
= if(matchkeys.get_zipCode() !=3Dnull){
values.add(matchkeys.get_zipCode());
=
}
if(matchkeys.getM_domain() !=3Dnull){=
values.add(matc= hkeys.getM_domain());
}
return values;
}
});
log.inf= o("blocking RDD is"+blockingRdd.count());
int count=3D0;
log.info("Starting printing");
=A0for (Tuple2<Long, String> entry := blockingRdd.collect()) {
= =A0
=A0<= a href=3D"http://log.info" target=3D"_blank">log.info(entry._1() + &quo= t;:" + entry._2());
<= /span> =A0 =A0 =A0count++;
= =A0 =A0}
= =A0log.info("total c= ount"+count);
<= /span>
JavaPairRDD&= lt;Long,Integer> completeDataToprocess=3DblockingRdd.flatMapValues( new = Function<String, Iterable<Integer>>(){

@Override
public Iterable<Integer> call= (String v1) throws Exception {
return ckdao.getSingelkeyresult(v1);
}
}= ).distinct(32);
log.info("after hbase count is&quo= t;+completeDataToprocess.count());
log.info("data for pro= cess"+completeDataToprocess.take(1));
<= /span>JavaPairRDD<Long, Tuple2<Integer, Double>> withScore =3Dc= ompleteDataToprocess.mapToPair( new PairFunction<Tuple2<Long,Integer&= gt;, Long, Tuple2<Integer, Double>>(){

@Override
public Tuple2<Long, Tuple2<Integ= er, Double>> call(Tuple2<Long, Integer> t)
throws Exception {
Scoring scoreObj =3D new Scoring()= ;
double score = =3DscoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getVa= lue().get(t._1()));
Tuple2<Integer, Double> maptuple =3D new Tuple2<Integer, Double= >(t._2(), score);
Tuple2<Long, Tuple2<Integer, Double>> tuple =3D new Tuple2&= lt;Long, Tuple2<Integer,Double>>(t._1(), maptuple);
return tuple;
}
<= /span>});
log.info("with score tuple is"+withScore.t= ake(1));
JavaPairRDD<Long, Tuple2&= lt;Integer,Double>> maxScoreRDD =3DwithScore.reduceByKey( new Functio= n2<Tuple2<Integer,Double>, Tuple2<Integer,Double>, Tuple2<= ;Integer,Double>>(){

@Override
public Tuple2<Integer, Double> call(Tuple2<Integer= , Double> v1,
Tuple2<Integer, Double> v2) throws Exception {
int res =3Dv1._2().compareTo(v2._2());
<= span style=3D"white-space:pre-wrap"> if(res >0){
Tuple2<Integer, Double> result =3D new= Tuple2<Integer, Double>(v1._1(), v1._2());
return result;
}
else if(res<0){
Tuple2<Integer, Double> result =3D new Tuple2<Integer, Double= >(v2._1(), v2._2());
= return result;
<= /span>}
else{
Tuple2<Integer, = Double> result =3D new Tuple2<Integer, Double>(v2._1(), v2._2());<= /div>
return result;
}
=
}
=
});
log.info("max scor= e RDD"+maxScoreRDD.take(10));

maxScoreRDD.foreach( new VoidFunction<Tuple2<Long,Tuple2= <Integer,Double>>>(){

@Override
public void call(Tuple2<Long, Tuple2<Integer, = Double>> t)
throws Exception {
MatcherReleventData matchedData=3DdataMatchGlobal.getValue().get(t._1()= );
log.info("broadcast is"+dataM= atchGlobal.getValue().get(t._1()));
//Set the score for better understanding of merge
<= div> matchedData.setScore(t.= _2()._2());
vdDoa= .updateMatchedRecordWithScore(matchedData, t._2()._1(),"Souce_id"= );
}
});
log.info("took " + (System.currentTimeMillis() - start= ) + " mills to run matcher");

= =A0
=A0
}


On 16 April 2015 at 22:25, Akhil Das <akhil@sigmoidanalytics.co= m> wrote:
Can you paste= your complete code? Did you try repartioning/increasing level of paralleli= sm to speed up the processing. Since you have 16 cores, and I'm assumin= g your 400k records isn't bigger than a 10G dataset.

ThanksBest Regards

On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra = Gangele <gangele397@gmail.com> wrote:
I already checked and G is taking 1 secs for each task. i= s this too much? if yes how to avoid this?


On 16 April 2015 at 21:58, Akhil Das= <akhil@sigmoidanalytics.com> wrote:
Open the driver ui and see which stage is taking ti= me, you can look whether its adding any GC time etc.

Thanks
Be= st Regards

On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra G= angele <gangele397@gmail.com> wrote:
=
Hi All I have below code whether distinct is running for m= ore time.

blockingRdd is the combination of <Long,String> and it will have= 400K records
JavaPairRDD<Long,Integer> completeDataTop= rocess=3DblockingRdd.flatMapValues( new Function<String, Iterable<Int= eger>>(){

@Override
public Iterable<Integer> call(String v1) throws Excepti= on {
return ckdao= .getSingelkeyresult(v1);
= }
}).distinct(32);

I am running distinct on 800K records and its ta= king 2 hours on 16 cores and 20 GB RAM.











--001a11c38be23869b80513dbcd49--