spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Radha krishna <grkmc...@gmail.com>
Subject Re: Spark Left outer Join issue using programmatic sql joins
Date Wed, 06 Jul 2016 16:44:58 GMT
Hi ,
Thanks all, its working fine the issue is with some space for the dept id,

I have one more doubt for the non matching records its showing null word,
even if i write into HDFS also its showing null word how can we avoid
writing null for the non matching columns, i want just empty value ("")

same input i used in the dept table i removed the last row and the below
code i used to write into hdfs.

DataFrame joinResult = sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
DEPT d ON e.deptid = d.deptid");
  joinResult.javaRDD().repartition(1).map(new Function<Row, String>() {
        private static final long serialVersionUID = 9185646063977504742L;
@Override
public String call(Row arg0) throws Exception {
String s;

 s=arg0.getString(0)+"\u001c"+arg0.getString(1)+"\u001c"+arg0.getString(2)+"\u001c"+arg0.getString(3)+"\u001c"+arg0.getString(4)+"\u001e";
return s;
}
}).saveAsTextFile(args[2]);


Output in HDFS File

10 1001 aba 10 dev
10 1003 abd 10 dev
10 1005 abg 10 dev
10 1007 abj 10 dev
10 1010 abq 10 dev
20 1002 abs 20 Test
20 1006 abh 20 Test
20 1009 abl 20 Test
30 1004 abf null null
30 1008 abk null null

in my case i want to store the join result back to data base table and its
storing "null" word for those non matching records, i want to store as
""(empty value) for the non matching rows.










On Wed, Jul 6, 2016 at 10:01 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Hive query is
>
> hive> SELECT *  FROM emp e LEFT OUTER JOIN dept d ON e.deptid = d.deptid;
> Status: Finished successfully in 2.02 seconds
> OK
> 1001    aba     10      10      DEV
> 1002    abs     20      20      TEST
> 1003    abd     10      10      DEV
> 1001    aba     10      10      DEV
> 1002    abs     20      20      TEST
> 1003    abd     10      10      DEV
> 1004    abf     30      30      IT
> 1005    abg     10      10      DEV
> 1004    abf     30      30      IT
> 1005    abg     10      10      DEV
> 1006    abh     20      20      TEST
> 1007    abj     10      10      DEV
> 1006    abh     20      20      TEST
> 1007    abj     10      10      DEV
> 1008    abk     30      30      IT
> 1009    abl     20      20      TEST
> 1010    abq     10      10      DEV
> 1008    abk     30      30      IT
> 1009    abl     20      20      TEST
> 1010    abq     10      10      DEV
> Time taken: 44.608 seconds, Fetched: 20 row(s)
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 July 2016 at 17:28, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> This is very simple
>>
>> in Hive
>>
>> Status: Running (Hive on Spark job[1])
>> Job Progress Format
>> CurrentTime StageId_StageAttemptId:
>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>> [StageCost]
>> 2016-07-06 17:17:16,006 Stage-1_0: 0(+1)/1
>> 2016-07-06 17:17:17,011 Stage-1_0: 1/1 Finished
>> Status: Finished successfully in 2.02 seconds
>> OK
>> 1001    aba     10      10      DEV
>> 1002    abs     20      20      TEST
>> 1003    abd     10      10      DEV
>> 1001    aba     10      10      DEV
>> 1002    abs     20      20      TEST
>> 1003    abd     10      10      DEV
>> 1004    abf     30      30      IT
>> 1005    abg     10      10      DEV
>> 1004    abf     30      30      IT
>> 1005    abg     10      10      DEV
>> 1006    abh     20      20      TEST
>> 1007    abj     10      10      DEV
>> 1006    abh     20      20      TEST
>> 1007    abj     10      10      DEV
>> 1008    abk     30      30      IT
>> 1009    abl     20      20      TEST
>> 1010    abq     10      10      DEV
>> 1008    abk     30      30      IT
>> 1009    abl     20      20      TEST
>> 1010    abq     10      10      DEV
>>
>>
>> In Spark
>>
>> scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> HiveContext: org.apache.spark.sql.hive.HiveContext =
>> org.apache.spark.sql.hive.HiveContext@f9402c2
>>
>> scala> HiveContext.sql("use test")
>> res1: org.apache.spark.sql.DataFrame = [result: string]
>>
>> scala> val e = HiveContext.table("emp")
>> e: org.apache.spark.sql.DataFrame = [emp_id: int, name: string, deptid:
>> int]
>> scala> val d = HiveContext.table("dept")
>>
>> d: org.apache.spark.sql.DataFrame = [deptid: int, dept_name: string]
>> scala> val rs = e.join(d,e("deptid")===d("deptid"),
>> "fullouter").collect.foreach(println)
>> [1001,aba,10,10,DEV]
>> [1003,abd,10,10,DEV]
>> [1001,aba,10,10,DEV]
>> [1003,abd,10,10,DEV]
>> [1005,abg,10,10,DEV]
>> [1005,abg,10,10,DEV]
>> [1007,abj,10,10,DEV]
>> [1007,abj,10,10,DEV]
>> [1010,abq,10,10,DEV]
>> [1010,abq,10,10,DEV]
>> [1002,abs,20,20,TEST]
>> [1002,abs,20,20,TEST]
>> [1006,abh,20,20,TEST]
>> [1006,abh,20,20,TEST]
>> [1009,abl,20,20,TEST]
>> [1009,abl,20,20,TEST]
>> [1004,abf,30,30,IT]
>> [1004,abf,30,30,IT]
>> [1008,abk,30,30,IT]
>> [1008,abk,30,30,IT]
>>
>>
>>
>> Note that you need to enforce ordering
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 6 July 2016 at 14:00, ayan guha <guha.ayan@gmail.com> wrote:
>>
>>> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
>>> for deptid=20,30.
>>>
>>> Did you check in hive cli?
>>>
>>> On Wed, Jul 6, 2016 at 10:33 PM, radha <grkmca95@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Please check below for the code and input and output, i think the
>>>> output is
>>>> not correct, i  am missing any thing? pls guide
>>>>
>>>> Code
>>>>
>>>> public class Test {
>>>>         private static JavaSparkContext jsc = null;
>>>>         private static SQLContext sqlContext = null;
>>>>         private static Configuration hadoopConf = null;
>>>>         public static void main(String[] args) {
>>>>
>>>>                 jsc = GlobalSparkContext.getJavaSparkContext();
>>>>                 sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>>>
>>>>                 hadoopConf = new
>>>> Configuration(jsc.hadoopConfiguration());
>>>>
>>>>
>>>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>>>
>>>>                 try {
>>>>                         final Emp emp = new Emp();
>>>>                         final Dept dept = new Dept();
>>>>
>>>>                         JavaPairRDD<LongWritable, Text> deptinputLines
=
>>>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>>>> Text.class, hadoopConf);
>>>>                         JavaRDD<Dept> deptRDD = deptinputLines.map(new
>>>> Function<Tuple2&lt;LongWritable, Text>, String>() {
>>>>                                                 @Override
>>>>                                                 public String
>>>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>>>                                                         return
>>>> arg0._2.toString();
>>>>                                                 }
>>>>
>>>>                                         }).map(new Function<String,
>>>> Dept>() {
>>>>
>>>>                                 public Dept call(String recordLine)
>>>> throws Exception {
>>>>                                         String[] parts =
>>>> recordLine.split(GlobalSparkContext.recordSeparator);
>>>>                                         return getInstanceDept(parts,
>>>> dept);
>>>>                                 }
>>>>                         });
>>>>
>>>>                         DataFrame deptDF =
>>>> sqlContext.createDataFrame(deptRDD, Dept.class);
>>>>                         deptDF.registerTempTable("DEPT");
>>>>                         //deptDF.show();
>>>>
>>>>                         JavaPairRDD<LongWritable, Text> inputLines
=
>>>> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
>>>> Text.class, hadoopConf);
>>>>                         JavaRDD<Emp> empRDD = inputLines.map(new
>>>> Function<Tuple2&lt;LongWritable,
>>>> Text>, String>() {
>>>>
>>>>                                                 private static final
>>>> long serialVersionUID = 3371707560417405016L;
>>>>
>>>>                                                 @Override
>>>>                                                 public String
>>>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>>>                                                         return
>>>> arg0._2.toString();
>>>>                                                 }
>>>>
>>>>                                         }).map(new Function<String,
>>>> Emp>() {
>>>>
>>>>                                 private static final long
>>>> serialVersionUID = 7656942162815285622L;
>>>>
>>>>                                 public Emp call(String recordLine)
>>>> throws Exception {
>>>>                                         String[] parts =
>>>> recordLine.split(GlobalSparkContext.recordSeparator);
>>>>                                         return getInstance(parts, emp);
>>>>                                 }
>>>>                         });
>>>>                         DataFrame empDF =
>>>> sqlContext.createDataFrame(empRDD, Emp.class);
>>>>                         empDF.registerTempTable("EMP");
>>>>
>>>>                    sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
>>>> DEPT d ON e.deptid
>>>> = d.deptid").show();
>>>>
>>>>
>>>>
>>>> //empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
>>>>
>>>>                 }
>>>>                 catch(Exception e){
>>>>                         System.out.println(e);
>>>>                 }
>>>>         }
>>>>         public static Emp getInstance(String[] parts, Emp emp) throws
>>>> ParseException {
>>>>                 emp.setId(parts[0]);
>>>>                 emp.setName(parts[1]);
>>>>                 emp.setDeptid(parts[2]);
>>>>
>>>>                 return emp;
>>>>         }
>>>>         public static Dept getInstanceDept(String[] parts, Dept dept)
>>>> throws
>>>> ParseException {
>>>>                 dept.setDeptid(parts[0]);
>>>>                 dept.setDeptname(parts[1]);
>>>>                 return dept;
>>>>         }
>>>> }
>>>>
>>>> Input
>>>> Emp
>>>> 1001 aba 10
>>>> 1002 abs 20
>>>> 1003 abd 10
>>>> 1004 abf 30
>>>> 1005 abg 10
>>>> 1006 abh 20
>>>> 1007 abj 10
>>>> 1008 abk 30
>>>> 1009 abl 20
>>>> 1010 abq 10
>>>>
>>>> Dept
>>>> 10 dev
>>>> 20 Test
>>>> 30 IT
>>>>
>>>> Output
>>>> +------+------+----+------+--------+
>>>> |deptid|    id|name|deptid|deptname|
>>>> +------+------+----+------+--------+
>>>> |    10|  1001| aba|    10|     dev|
>>>> |    10|  1003| abd|    10|     dev|
>>>> |    10|  1005| abg|    10|     dev|
>>>> |    10|  1007| abj|    10|     dev|
>>>> |    10|  1010| abq|    10|     dev|
>>>> |    20|  1002| abs|  null|    null|
>>>> |    20|  1006| abh|  null|    null|
>>>> |    20|  1009| abl|  null|    null|
>>>> |    30|  1004| abf|  null|    null|
>>>> |    30|  1008| abk|  null|    null|
>>>> +------+------+----+------+--------+
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>


-- 








Thanks & Regards
   Radha krishna

Mime
View raw message