hadoop-general mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dennis <arsenep...@yahoo.com.cn>
Subject Re: RawComparator
Date Wed, 11 Aug 2010 01:07:10 GMT
Thanks, Josh, for your professional answers.

The following is my "composite key" class. And in the Q1003InPairComparator.compare(byte[]
b1, int s1, int l1, byte[] b2, int s2, int l2) method, I figured out two ways of picking up
original String values and comparing. The first "using streams" one is using java.io.* package
to deserialize the original String values, and the comparision is easy to do. The second "using
byte[]" one, I just try to deserialize the byte[] by myself, as it's not difficult to do this,
and I donnot have to "new" any stream classes in java.io.*.
So,
1. What do you say about the two methods? Or any better ones?
2. If I use the first "using streams" one, what do I deal with the IOException? If an Exception
is thrown, what value should I return?(In the following code, I simply returned -1. I know
it's not smart to do so.)


    public static class Q1003InPair implements WritableComparable<Q1003InPair> {
        private String dateStr;
        private String str;

        public void set(String dateStr, String str) {
            this.dateStr = dateStr;
            this.str = str;
        }

        public String getDateStr() {
            return this.dateStr;
        }

        public String getStr() {
            return this.str;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            dateStr = in.readUTF();
            str = in.readUTF();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(dateStr);
            out.writeUTF(str);
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result
                    + ((dateStr == null) ? 0 : dateStr.hashCode());
            result = prime * result + ((str == null) ? 0 : str.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            final Q1003InPair other = (Q1003InPair) obj;
            if (dateStr == null) {
                if (other.dateStr != null)
                    return false;
            } else if (!dateStr.equals(other.dateStr))
                return false;
            if (str == null) {
                if (other.str != null)
                    return false;
            } else if (!str.equals(other.str))
                return false;
            return true;
        }

        public String toString() {
            StringBuffer sb = new StringBuffer();
            sb.append(this.getDateStr());
            sb.append(",");
            sb.append(this.getStr());
            return sb.toString();
        }

        public static class Q1003InPairComparator extends WritableComparator {
            public Q1003InPairComparator() {
                super(Q1003InPair.class);
            }

            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
                    int l2) {
                // 1. using streams
                // ByteArrayInputStream bais1 = null;
                // ByteArrayInputStream bais2 = null;
                // DataInputStream dis1 = null;
                // DataInputStream dis2 = null;
                // try {
                // bais1 = new ByteArrayInputStream(b1);
                // dis1 = new DataInputStream(bais1);
                // dis1.skip(s1);
                // String str1 = dis1.readUTF();
                // String str3 = dis1.readUTF();
                //                    
                // bais2 = new ByteArrayInputStream(b2);
                // dis2 = new DataInputStream(bais2);
                // dis2.skip(s2);
                // String str2 = dis2.readUTF();
                // String str4 = dis2.readUTF();
                //                    
                // if (str1.equals(str2)) {
                // return str3.compareTo(str4);
                // } else {
                // return str1.compareTo(str2);
                // }
                // } catch(Exception e) {
                // } finally {
                // try {
                // dis1.close();
                // } catch(Exception e) {}
                // try {
                // dis2.close();
                // } catch(Exception e) {}
                // try {
                // bais1.close();
                // } catch(Exception e) {}
                // try {
                // bais2.close();
                // } catch(Exception e) {}
                // }
                // return -1;

                // 2. using byte[]
                int strLength1 = ((int) b1[s1]) * 256 + ((int) b1[s1 + 1]);
                String str1 = new String(b1, s1 + 2, strLength1);
                s1 += strLength1 + 2;
                int strLength3 = ((int) b1[s1]) * 256 + ((int) b1[s1 + 1]);
                String str3 = new String(b1, s1 + 2, strLength3);

                int strLength2 = ((int) b2[s2]) * 256 + ((int) b2[s2 + 1]);
                String str2 = new String(b2, s2 + 2, strLength2);
                s2 += strLength2 + 2;
                int strLength4 = ((int) b2[s2]) * 256 + ((int) b2[s2 + 1]);
                String str4 = new String(b2, s2 + 2, strLength4);

                if (str1.equals(str2)) {
                    return str3.compareTo(str4);
                } else {
                    return str1.compareTo(str2);
                }
            }
        }

        static {
            WritableComparator.define(Q1003InPair.class,
                    new Q1003InPairComparator());
        }

        @Override
        public int compareTo(Q1003InPair o) {
            if (!this.getDateStr().equals(o.getDateStr())) {
                return this.getDateStr().equals(o.getDateStr()) ? 0 : (this
                        .getDateStr().compareTo(o.getDateStr()) > 0 ?
1 : -1);
            } else if (!this.getStr().equals(o.getStr())) {
                return this.getStr().equals(o.getStr()) ? 0 : (this.getStr()
                        .compareTo(o.getStr()) > 0 ? 1 : -1);
            } else {
                return 0;
            }
        }
    }

Thanks again.
Dennis


--- On Wed, 8/11/10, Josh Patterson <josh@cloudera.com> wrote:

From: Josh Patterson <josh@cloudera.com>
Subject: Re: RawComparator
To: mapreduce-user@hadoop.apache.org
Cc: general@hadoop.apache.org
Date: Wednesday, August 11, 2010, 3:21 AM

Dennis,

On Tue, Aug 10, 2010 at 4:01 AM, Dennis <arsenepark@yahoo.com.cn> wrote:

> Hi, guys,
>
> I am using hadoop 0.20.2, and I am trying to run the "SecondarySort"
> exmaple. The following is the "FirstGroupingComparator" class, and I just
> cannot figure out how "WritableComparator.compareBytes(b1, s1,
> Integer.SIZE / 8, b2, s2, Integer.SIZE / 8)" works. There are really few
> javadocs of this class or  this method.
> 1. Why it is "Integer.SIZE / 8"?
>

That says "take the size of the integer in bits on this system and divide it
by 8" --- which in java on 32 and 64 bit systems should give you 32 / 8 == 4
as afaik the integer bit width doesnt change based on the architecture with
java. So its saying here "compare the first 4 bytes of each byte array" (the
width, in bytes, of the first integer in the composite key) ,whereas
Integer.SIZE gives the number of bits in the datatype.

WritableComparators are useful in the shuffle phase of hadoop; we are
constantly comparing and sorting WritableComparables, and the secondary
sorting mechanics allow us to have a group of data for a key arrive at the
reducer in a certain order (example: time series data, where we want a range
of timestamps in one group, but we also want them in order when they are
processed inside the reducer)


> 2. If I want to compare two "String" here, how should I write to code?
>
>     public static class FirstGroupingComparator implements
>             RawComparator<IntPair> {
>         @Override
>         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
> int l2) {
>             int ret = WritableComparator.compareBytes(b1, s1, Integer.SIZE
> / 8,
>                     b2, s2, Integer.SIZE / 8);
>             return ret;
>         }
>
>         @Override
>         public int compare(IntPair o1, IntPair o2) {
>             int l = o1.getFirst();
>             int r = o2.getFirst();
>             return l == r ? 0 : (l < r ? -1 : 1);
>         }
>     }
>

In the case of the comparison of strings, lets say for example you have a
"composite key" that has two String or Text object members (k1, k2); We
"group by" the first part of the key k1 and we sort by this key as well (we
block ranges together). This is very similar to the example above. Since
with a RawComparator we are looking to only deserialize a portion of the
data to do the comparison, you'll need a strategy for the compare() function
that takes into account that the strings are variable length (which means we
are unable to simply read 4 bytes as in the case of the integer). The
challenge here is to only deserialize the portion of the composite key that
contains the string/text that you want to compare against, which is going to
be a variable number of bytes each time. A good place to start looking at
for ideas would be the Text class in Hadoop and also WritableUtils.

Josh Patterson
Cloudera



> Thanks.
> Dennis
>
>



      
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message