Return-Path: X-Original-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4D7F51085B for ; Tue, 29 Oct 2013 07:58:21 +0000 (UTC) Received: (qmail 21544 invoked by uid 500); 29 Oct 2013 07:58:13 -0000 Delivered-To: apmail-hadoop-hdfs-user-archive@hadoop.apache.org Received: (qmail 21385 invoked by uid 500); 29 Oct 2013 07:58:05 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 21371 invoked by uid 99); 29 Oct 2013 07:58:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Oct 2013 07:58:04 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of drdwitte@gmail.com designates 209.85.217.180 as permitted sender) Received: from [209.85.217.180] (HELO mail-lb0-f180.google.com) (209.85.217.180) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Oct 2013 07:57:53 +0000 Received: by mail-lb0-f180.google.com with SMTP id y6so3632990lbh.25 for ; Tue, 29 Oct 2013 00:57:32 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=CYPmhF9y0QvFGNj9n2zxy1KeMD86Vijcz9yZ5W7t6lg=; b=ucVWH1Wa242l8wz51EjzL4llvD9Ga3c+jTBwnFaeRuD1uvItNWV5XuqvSbZDwaWP86 A/Sq4Wpme1rZK6sziebGPbe/Nz59INWL8Eqq6CXwgEpqehq3SvM3IkDUS2bxxyZPkdU6 n2qaNJ1Xds2WmTX2qndukm3HOhA7du1R76ZB6TdaSWcHffb1dfKEJr3KyY6IwxmZk7BH ASnCvpP2LJIk9GPKI4/GFbs/UENTIPz0h5vaPo+OUgNIQUKy+rTZgOGksGTtf/0/6Dkj azzTmgIk3hv51572pvOkXx5qFof/urf4ZsZcodSJyntox1OE2khs+IFEsHyJxKDAeBe1 zXiw== MIME-Version: 1.0 X-Received: by 10.112.168.35 with SMTP id zt3mr13910890lbb.11.1383033452235; Tue, 29 Oct 2013 00:57:32 -0700 (PDT) Received: by 10.112.200.163 with HTTP; Tue, 29 Oct 2013 00:57:32 -0700 (PDT) In-Reply-To: References: Date: Tue, 29 Oct 2013 08:57:32 +0100 Message-ID: Subject: Re: Why the reducer's input group count is higher than my GroupComparator implementation From: Dieter De Witte To: user@hadoop.apache.org Content-Type: multipart/alternative; boundary=001a11c34b48a2102204e9dc9154 X-Virus-Checked: Checked by ClamAV on apache.org --001a11c34b48a2102204e9dc9154 Content-Type: text/plain; charset=ISO-8859-1 Did you overwrite the partitioner as well? 2013/10/29 java8964 java8964 > Hi, I have a strange question related to my secondary sort implementation > in the MR job. > Currently I need to support 2nd sort in one of my MR job. I implemented my > custom WritableComparable like following: > > public class MyPartitionKey implements WritableComparable { > String type; > long id1; > String id2; > String id3; > String id4; > long timestamp1; > long timestamp2 > } > > Then I implemented following methods for this class: > > public int compareTo(); // sort the data based on all attributes listed > above, sorted the last 2 timestamps descending > public int hashCode(); // generate the hashcode using all attributes above > public boolean equals(); // using all the attributes for equals check > public void write(DataOutput out) // serialize all the attributes listed > above > public void readFields(DataInput in) // deserialize all the attributes > listed above > > For partition and grouping of my keys, I want the following logic: > Based on the type, the data could partition either by year or by day for > timestamp1. > > For sorting order, I want the data sort by (type, id1, id2, id3, id4), > then reverse sorting by (timestamp1, timestamp2). > > I implemented my KeyComparator using my sorting order logic listed above, > and my Partitioner and GroupComparator based on my logic listed above. > > Here is the pseudo code of the Partitioner and GroupComparator: > > public class MyPartitioner implements Partitioner { > @Override > public int getPartition(MyPartitionKey key, Value value, int > numPartitions) { > int hashCode = key.getActivityType().name().hashCode(); > StringBuilder sb = new StringBuilder(); > for (String subPartitionValue : key.getPartitionValue()) { > sb.append(subPartitionValue); > } > return Math.abs(hashCode * 127 + sb.toString().hashCode()) % > numPartitions; > } > > @Override > public void configure(JobConf job) { > } > } > > // The key getPartitionValue method will return array of string of either > YYYY or {YYYY, MM, DD} of the timestamp1. > > For GroupComparator: > > public static class MyGroupComparator extends WritableComparator { > protected MyGroupComparator() { > super(MyPartitionKey.class, true); > } > > @Override > public int compare(WritableComparable w1, WritableComparable w2) { > MyPartitionKey key1 = (MyPartitionKey) w1; > MyPartitionKey key2 = (MyPartitionKey) w2; > int cmp = key1.type.compareTo(key2.type); > // different type, send to different group > if (cmp != 0) > return cmp; > > // for the same type, should have the same partition value > array length > String[] partitionValue1 = key1.getPartitionValue(); > String[] partitionValue2 = key2.getPartitionValue(); > assert partitionValue1.length == partitionValue2.length; > StringBuilder sb1 = new StringBuilder(); > StringBuilder sb2 = new StringBuilder(); > for (String subValue : partitionValue1) { > sb1.append(subValue); > } > for (String subValue : partitionValue2) { > sb2.append(subValue); > } > return sb1.toString().compareTo(sb2.toString()); > } > > Now, here is the strange problem I don't understand. I tested with my MR > job. I know in the test data, I have 7 types data, 3 of them partitioned > yearly, 4 of them partition daily. In the test data, for the 3 types > partitioned daily, there are 2 days data of each type. So I expected the > Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In > fact, if I don't use this custom MyPartitionKey, just use Text as the key > type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily > dataset, there are 11 input groups for the reducer. But I have to support > secondary sort. To my surprise, runtime MR job generates 51792 input groups > for the reducer. This doesn't make sense. > > If I changed MyGroupComparator compare() method, to only compare the > type, like following: > @Override > public int compare(WritableComparable w1, WritableComparable w2) { > MyPartitionKey key1 = (MyPartitionKey) w1; > MyPartitionKey key2 = (MyPartitionKey) w2; > return key1.type.compareTo(key2.type); > } > The MR job generates 7 input group for the reducer, which is what I > expects. But when I start to add the comparing of the YYYY or MM or DD data > parsed out from the timestamp1, the input group count became very large. > > What I think is that maybe Id1, Id2, Id3, and Id4 makes the input group > large, because in the test data, there are a lot of combination of unique > (id1,id2,id3,id4). But they are NOT part of my GroupComparator > implementation. Why in this case, the input group count for the reducer is > so high? And in this case, the MR job won't do what I want, as same group > of data NOT being sent to the same reducer. Here are the summary of my > questions: > > 1) My understanding is that GroupComparator is the only class to control > the input groups of the reducer, is that correct? > 2) If so, in my case above, I know MyGroupComparator will return 11 unique > values from my test data. Why there are 51792 input groups generated? This > big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not > used in MyGroupComparator, why they affect the reducer input group count? > 3) If I only use type in my GroupComparator, I got correct 7 input groups > for the reducer. So in this case, it correctly ignored all other data > contains in the MyPartitionKey class, why? Is the order of the attributes > make any difference? I don't think so, but I cannot explain the above > result. > > If you have any idea, or my implementation has any problem, please let me > know. > > Thanks > > Yong > --001a11c34b48a2102204e9dc9154 Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
Did you overwrite the partitioner as well?


2013/10/29 java8964= java8964 <java8964@hotmail.com>
Hi, I have a strange question related to my secondary= sort implementation in the MR job.
Currently I need to support 2nd sor= t in one of my MR job. I implemented my custom WritableComparable like foll= owing:

public class MyPartitionKey=A0implements WritableCompar= able<MyPartitionKey> {
=A0 =A0 String type;
=A0 = =A0 long id1;
=A0 =A0 String id2;
=A0 =A0 String id3;
=A0 =A0 String id4;
=A0 =A0 long timestamp1;
=A0 =A0 long timestamp2
}=

Then I implemented following methods for this cla= ss:

public=A0int compareTo(); // sort the data bas= ed on all attributes listed above, sorted the last 2 timestamps descending<= /div>
public int hashCode(); // generate the hashcode using all attributes a= bove
public boolean equals(); // using all the attributes for equ= als check
public=A0void write(DataOutput out) // serialize all th= e attributes listed above
public void=A0readFields(DataInput in) // deserialize all the attribut= es listed above

For partition and grouping of my k= eys, I want the following logic:
Based on the type, the data coul= d partition either by year or by day for timestamp1.

For sorting order, I want the data sort by (type, id1, = id2, id3, id4), then reverse sorting by (timestamp1, timestamp2).

I implemented my=A0KeyComparator using my sorting order log= ic listed above, and my Partitioner and GroupComparator based on my logic l= isted above.

Here is the pseudo code of the Partitioner and GroupCom= parator:

public class MyPartitioner implement= s Partitioner {
=A0 =A0 @Override
=A0 =A0 public int ge= tPartition(MyPartitionKey=A0key, Value value, int numPartitions) {
=A0 =A0 =A0 =A0 int hashCode =3D key.getActivityType().name().hashCode= ();
=A0 =A0 =A0 =A0 StringBuilder sb =3D new StringBuilder();
=A0 =A0 =A0 =A0 for (String subPartitionValue : key.getPartitionValu= e()) {
=A0 =A0 =A0 =A0 =A0 =A0 sb.append(subPartitionValue);
=A0 =A0 =A0 =A0 }
=A0 =A0 =A0 =A0 return Math.abs(hashCode *= 127 + sb.toString().hashCode()) % numPartitions;
=A0 =A0 }
=

=A0 =A0 @Override
=A0 =A0 public void configu= re(JobConf job) {
=A0 =A0 }
}

// The key getPartitio= nValue method will return array of string of either YYYY or {YYYY, MM, DD} = of the timestamp1.

For GroupComparator:

=A0 =A0 public static class MyGroupComparator extends W= ritableComparator {
=A0 =A0 =A0 =A0 protected=A0MyGroupComparator= () {
=A0 =A0 =A0 =A0 =A0 =A0 super(MyPartitionKey.class, true);<= /div>
=A0 =A0 =A0 =A0 }

=A0 =A0 =A0 =A0 @Override
=A0 =A0 =A0 =A0 public int compare(WritableComparable w1, Writable= Comparable w2) {
=A0 =A0 =A0 =A0 =A0 =A0=A0MyPartitionKey=A0key1 = =3D (MyPartitionKey) w1;
=A0 =A0 =A0 =A0 =A0 =A0=A0MyPartitionKey= =A0key2 =3D (MyPartitionKey) w2;
=A0 =A0 =A0 =A0 =A0 =A0 int cmp =3D key1.type.compareTo(key2.type);
=A0 =A0 =A0 =A0 =A0 =A0 // different type, send to different group<= /div>
=A0 =A0 =A0 =A0 =A0 =A0 if (cmp !=3D 0)
=A0 =A0 =A0 =A0= =A0 =A0 =A0 =A0 return cmp;

=A0 =A0 =A0 =A0 =A0 =A0 //= for the same type, should have the same partition value array length
=A0 =A0 =A0 =A0 =A0 =A0 String[] partitionValue1 =3D key1.getPa= rtitionValue();
=A0 =A0 =A0 =A0 =A0 =A0 String[] partitionValue2 =3D key2.getPartitionValue= ();
=A0 =A0 =A0 =A0 =A0 =A0 assert partitionValue1.length =3D=3D = partitionValue2.length;
=A0 =A0 =A0 =A0 =A0 =A0 StringBuilder sb1= =3D new StringBuilder();
=A0 =A0 =A0 =A0 =A0 =A0 StringBuilder s= b2 =3D new StringBuilder();
=A0 =A0 =A0 =A0 =A0 =A0 for (String subValue : partitionValue1) {
=A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 sb1.append(subValue);
=A0 = =A0 =A0 =A0 =A0 =A0 }
=A0 =A0 =A0 =A0 =A0 =A0 for (String subValu= e : partitionValue2) {
=A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 sb2.append= (subValue);
=A0 =A0 =A0 =A0 =A0 =A0 }
=A0 =A0 =A0 =A0 =A0 =A0 return sb1= .toString().compareTo(sb2.toString());
=A0 =A0 =A0 =A0 }

Now, here is t= he strange problem I don't understand. I tested with my MR job. I know = in the test data, I have 7 types data, 3 of them partitioned yearly, 4 of t= hem partition daily. In the test data, for the 3 types partitioned daily, t= here are 2 days data of each type. So I expected the Input group count of t= he reducer should be 11, which is 4 x 2 + 3 =3D 11. In fact, if I don't= use this custom MyPartitionKey, just use =A0Text as the key type, with &qu= ot;type + YYYY" for yearly dataset, "type + YYYYMMDD" for da= ily dataset, there are 11 input groups for the reducer. But I have to suppo= rt secondary sort. To my surprise, runtime MR job generates 51792 input gro= ups for the reducer. This doesn't make sense.=A0

If I changed=A0MyGroupCo= mparator compare() method, to only compare the type, like following:=
=A0 =A0 =A0 @Override
=A0 =A0 =A0 =A0 public int = compare(WritableComparable w1, WritableComparable w2) {
=A0 =A0 =A0 =A0 =A0 =A0=A0MyPartitionKey=A0key1 =3D (MyPartitionKey) w= 1;
=A0 =A0 =A0 =A0 =A0 =A0=A0MyPartitionKey=A0key2 =3D (MyPartiti= onKey) w2;
=A0 =A0 =A0 =A0 =A0 =A0 return key1.type.compareTo(key= 2.type);
=A0=A0 =A0 =A0 }
The MR job generates 7 input group for the reducer, which is what I expects= . But when I start to add the comparing of the YYYY or MM or DD data parsed= out from the timestamp1, the input group count became very large.

What I think is that maybe Id1, Id2, Id3, and Id4 makes= the input group large, because in the test data, there are a lot of combin= ation of unique (id1,id2,id3,id4). But they are NOT part of my GroupCompara= tor implementation. Why in this case, the input group count for the reducer= is so high? And in this case, the MR job won't do what I want, as same= group of data NOT being sent to the same reducer. Here are the summary of = my questions:

1) My understanding is that GroupComparator is the only= class to control the input groups of the reducer, is that correct?
2) If so, in my case above, I know MyGroupComparator will return 11 uniq= ue values from my test data. Why there are 51792 input groups generated? Th= is big number must come from (Id1, Id2, Id3 and Id4), but these Ids are not= used in MyGroupComparator, why they affect the reducer input group count?<= /div>
3) If I only use type in my GroupComparator, I got correct 7 input gro= ups for the reducer. So in this case, it correctly ignored all other data c= ontains in the MyPartitionKey class, why? Is the order of the attributes ma= ke any difference? I don't think so, but I cannot explain the above res= ult.

If you have any idea, or my implementation has any prob= lem, please let me know.

Thanks

Yong

--001a11c34b48a2102204e9dc9154--