hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dennis <arsenep...@yahoo.com.cn>
Subject Re: RawComparator
Date Wed, 11 Aug 2010 01:11:01 GMT
Thanks, Josh, for your professional answers.

The following is my "composite key" class. And in the Q1003InPairComparator.compare(byte[]
b1, int s1, int l1, byte[] b2, int s2, int l2) method, I figured out two ways of picking up
original String values and comparing. The first "using streams" one is using java.io.* package
to deserialize the original String
values, and the comparision is easy to do. The second "using byte[]"
one, I just try to deserialize the byte[] by myself, as it's not
difficult to do this, and I donnot have to "new" any stream classes in java.io.*.
So,
1. What do you say about the two methods? Or any better ones?
2. If I use
 the first "using streams" one, what do I deal with the IOException? If an Exception is thrown,
what value should I return?(In the following code, I simply returned -1. I know it's not smart
to do so.)


    public static class Q1003InPair implements WritableComparable<Q1003InPair> {
        private String dateStr;
        private String str;

        public void set(String dateStr, String str) {
            this.dateStr = dateStr;
            this.str = str;
        }

        public String getDateStr() {
            return
 this.dateStr;
        }

        public String getStr() {
            return this.str;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            dateStr = in.readUTF();
            str = in.readUTF();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(dateStr);
            out.writeUTF(str);
       
 }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result
                    + ((dateStr == null) ? 0 : dateStr.hashCode());
            result = prime * result + ((str == null) ? 0 : str.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this ==
 obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            final Q1003InPair other = (Q1003InPair) obj;
            if (dateStr == null) {
                if (other.dateStr != null)
                    return false;
            } else if
 (!dateStr.equals(other.dateStr))
                return false;
            if (str == null) {
                if (other.str != null)
                    return false;
            } else if (!str.equals(other.str))
                return false;
            return true;
        }

        public String toString() {
            StringBuffer sb = new StringBuffer();
           
 sb.append(this.getDateStr());
            sb.append(",");
            sb.append(this.getStr());
            return sb.toString();
        }

        public static class Q1003InPairComparator extends WritableComparator {
            public Q1003InPairComparator() {
                super(Q1003InPair.class);
            }

            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
                    int l2) {
   
             // 1. using streams
                // ByteArrayInputStream bais1 = null;
                // ByteArrayInputStream bais2 = null;
                // DataInputStream dis1 = null;
                // DataInputStream dis2 = null;
                // try {
                // bais1 = new ByteArrayInputStream(b1);
                // dis1 = new DataInputStream(bais1);
                //
 dis1.skip(s1);
                // String str1 = dis1.readUTF();
                // String str3 = dis1.readUTF();
                //                    
                // bais2 = new ByteArrayInputStream(b2);
                // dis2 = new DataInputStream(bais2);
                // dis2.skip(s2);
                // String str2 = dis2.readUTF();
                // String str4 =
 dis2.readUTF();
                //                    
                // if (str1.equals(str2)) {
                // return str3.compareTo(str4);
                // } else {
                // return str1.compareTo(str2);
                // }
                // } catch(Exception e) {
                // } finally {
           
     // try {
                // dis1.close();
                // } catch(Exception e) {}
                // try {
                // dis2.close();
                // } catch(Exception e) {}
                // try {
                // bais1.close();
                // } catch(Exception e) {}
                // try {
           
     // bais2.close();
                // } catch(Exception e) {}
                // }
                // return -1;

                // 2. using byte[]
                int strLength1 = ((int) b1[s1]) * 256 + ((int) b1[s1 + 1]);
                String str1 = new String(b1, s1 + 2, strLength1);
                s1 += strLength1 + 2;
                int strLength3 = ((int) b1[s1]) * 256 + ((int) b1[s1 + 1]);
   
             String str3 = new String(b1, s1 + 2, strLength3);

                int strLength2 = ((int) b2[s2]) * 256 + ((int) b2[s2 + 1]);
                String str2 = new String(b2, s2 + 2, strLength2);
                s2 += strLength2 + 2;
                int strLength4 = ((int) b2[s2]) * 256 + ((int) b2[s2 + 1]);
                String str4 = new String(b2, s2 + 2, strLength4);

                if (str1.equals(str2)) {
                    return
 str3.compareTo(str4);
                } else {
                    return str1.compareTo(str2);
                }
            }
        }

        static {
            WritableComparator.define(Q1003InPair.class,
                    new Q1003InPairComparator());
        }

        @Override
        public int compareTo(Q1003InPair o) {
            if
 (!this.getDateStr().equals(o.getDateStr())) {
                return this.getDateStr().equals(o.getDateStr()) ? 0 : (this
                        .getDateStr().compareTo(o.getDateStr()) > 0 ?
1 : -1);
            } else if (!this.getStr().equals(o.getStr())) {
                return this.getStr().equals(o.getStr()) ? 0 : (this.getStr()
                        .compareTo(o.getStr()) > 0 ? 1 : -1);
            } else {
                return 0;
       
     }
        }
    }

Thanks again.
Dennis


--- On Wed, 8/11/10, Josh Patterson <josh@cloudera.com> wrote:

From: Josh Patterson <josh@cloudera.com>
Subject: Re: RawComparator
To: mapreduce-user@hadoop.apache.org
Cc: general@hadoop.apache.org
Date: Wednesday, August 11, 2010, 3:21 AM

Dennis,

On Tue, Aug 10, 2010 at 4:01 AM, Dennis <arsenepark@yahoo.com.cn> wrote:


Hi, guys,

I am using hadoop 0.20.2, and I am trying to run the "SecondarySort" exmaple. The following
is the "FirstGroupingComparator" class, and I just cannot figure out how "WritableComparator.compareBytes(b1,
s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8)" works. There are really few javadocs of this
class or  this method.


1. Why it is "Integer.SIZE / 8"?

That says "take the size of the integer in bits on this system and divide it by 8" --- which
in java on 32 and 64 bit systems should give you 32 / 8 == 4 as afaik the integer bit width
doesnt change based on the architecture with java. So its saying here "compare the first 4
bytes of each byte array" (the width, in bytes, of the first integer in the composite key)
,whereas Integer.SIZE gives the number of bits in the datatype.


WritableComparators are useful in the shuffle phase of hadoop; we are constantly comparing
and sorting WritableComparables, and the secondary sorting mechanics allow us to have a group
of data for a key arrive at the reducer in a certain order (example: time series data, where
we want a range of timestamps in one group, but we also want them in order when they are processed
inside the reducer)

 2. If I want to compare two "String" here, how should I write to code?



    public static class FirstGroupingComparator implements
            RawComparator<IntPair> {
        @Override
        public int compare(byte[] b1, int
 s1, int l1, byte[] b2, int s2, int l2) {
            int ret = WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
                    b2, s2, Integer.SIZE / 8);
            return ret;
        }



        @Override
        public int compare(IntPair o1, IntPair o2) {
            int l = o1.getFirst();
            int r = o2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }


    }

In the case of the comparison of strings, lets say for example you have a "composite key"
that has two String or Text object members (k1, k2); We "group by" the first part of the key
k1 and we sort by this key as well (we block ranges together). This is very similar to the
example above. Since with a RawComparator we are looking to only deserialize a portion of
the data to do the comparison, you'll need a strategy for the compare() function that takes
into account that the strings are variable length (which means we are unable to simply read
4 bytes as in the case of the integer). The challenge here is to only deserialize the portion
of the composite key that contains the string/text that you want to compare against, which
is going to be a variable number of bytes each time. A good place to start looking at for
ideas would be the Text class in Hadoop and also WritableUtils.

Josh PattersonCloudera




Thanks.
Dennis



      




      
Mime
View raw message