hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Max Lebedev <ma...@actionx.com>
Subject Re: Mapreduce using JSONObjects
Date Wed, 05 Jun 2013 23:10:24 GMT
I’ve taken your advice and made a wrapper class which implements
WritableComparable. Thank you very much for your help. I believe everything
is working fine on that front. I used google’s gson for the comparison.


public int compareTo(Object o) {

    JsonElement o1 = PARSER.parse(this.json.toString());

    JsonElement o2 = PARSER.parse(o.toString());

    if(o2.equals(o1))

        return 0;

    else

        return -1;

}


The problem I have now is that only consecutive duplicates are detected.
Given 6 lines:

{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}

{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}

{"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}

{"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}

{"ts":1368758947.291035,
"source":"sdk","isSecure":false,"version":2,"debug":false}

{"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}


I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is exactly
equal to 1. If I switch 5 and 6, the original line 5 is no longer filtered
(I get 1,3,4,5,6). I’ve noticed that the compareTo method is called a total
of 13 times. I assume that in order for all 6 of the keys to be compared,
15 comparisons need to be made. Am I missing something here? I’ve tested
the compareTo manually and line 1 and 6 are interpreted as equal. My map
reduce code currently looks like this:


class DupFilter{

    private static final Gson GSON = new Gson();

    private static final JsonParser PARSER = new JsonParser();

    public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
        public void map(LongWritable key, Text value,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
IOException{

            JsonElement je = PARSER.parse(value.toString());

            JSONWrapper jow = null;

            jow = new JSONWrapper(value.toString());

            IntWritable one = new IntWritable(1);

            output.collect(jow, one);

            }

    }

    public static class Reduce extends MapReduceBase implements
Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {

        public void reduce(JSONWrapper jow, Iterator<IntWritable> values,
OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
IOException {

            int sum = 0;

            while (values.hasNext())

                sum += values.next().get();

            output.collect(jow, new IntWritable(sum));

            }

    }

    public static void main(String[] args) throws Exception {

        JobConf conf = new JobConf(DupFilter.class);

        conf.setJobName("dupfilter");

        conf.setOutputKeyClass(JSONWrapper.class);

        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);

        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);

        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);

    }

}

Thanks,

Max Lebedev


On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee <
rahul.rec.dgp@gmail.com> wrote:

> I agree with Shahab , you have to ensure that the key are writable
> comparable and values are writable in order to be used in MR.
>
> You can have writable comparable implementation wrapping the actual json
> object.
>
> Thanks,
> Rahul
>
>
> On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <mischa@mmt.me.uk> wrote:
>
>> Hello,
>>
>> On 4 Jun 2013, at 23:49, Max Lebedev <max.l@actionx.com> wrote:
>>
>> Hi. I've been trying to use JSONObjects to identify duplicates in
>> JSONStrings.
>> The duplicate strings contain the same data, but not necessarily in the
>> same order. For example the following two lines should be identified as
>> duplicates (and filtered).
>>
>>
>> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
>>
>> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>>
>> Can you not use the timestamp as a URI and emit them as URIs. Then you
>> have your mapper emit the following kv :
>>
>> output.collect(ts, value);
>>
>> And you would have a straight forward reducer that can dedup based on the
>> timestamps.
>>
>> If above doesn't work for you, I would look at the jackson library for
>> mangling json in java. It method of using java beans for json is clean from
>> a code pov and comes with lots of nice features.
>> http://stackoverflow.com/a/2255893
>>
>> P.S. In your code you are using the old'er map reduce API, I would look
>> at using the newer APIs in this package org.apache.hadoop.mapreduce
>>
>> Mischa
>>
>> This is the code:
>>
>> class DupFilter{
>>
>>         public static class Map extends MapReduceBase implements
>> Mapper<LongWritable, Text, JSONObject, Text> {
>>
>>                 public void map(LongWritable key, Text value,
>> OutputCollector<JSONObject, Text> output, Reporter reporter) throws
>> IOException{
>>
>>                 JSONObject jo = null;
>>
>>                 try {
>>
>>                         jo = new JSONObject(value.toString());
>>
>>                         } catch (JSONException e) {
>>
>>                                 e.printStackTrace();
>>
>>                         }
>>
>>                 output.collect(jo, value);
>>
>>                 }
>>
>>         }
>>
>>         public static class Reduce extends MapReduceBase implements
>> Reducer<JSONObject, Text, NullWritable, Text> {
>>
>>                 public void reduce(JSONObject jo, Iterator<Text> lines,
>> OutputCollector<NullWritable, Text> output, Reporter reporter) throws
>> IOException {
>>
>>                         output.collect(null, lines.next());
>>
>>                 }
>>
>>         }
>>
>>         public static void main(String[] args) throws Exception {
>>
>>                 JobConf conf = new JobConf(DupFilter.class);
>>
>>                 conf.setOutputKeyClass(JSONObject.class);
>>
>>                 conf.setOutputValueClass(Text.class);
>>
>>                 conf.setMapperClass(Map.class);
>>
>>                 conf.setReducerClass(Reduce.class);
>>
>>                 conf.setInputFormat(TextInputFormat.class);
>>
>>                 conf.setOutputFormat(TextOutputFormat.class);
>>
>>                 FileInputFormat.setInputPaths(conf, new Path(args[0]));
>>
>>                 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>
>>                 JobClient.runJob(conf);
>>
>>         }
>>
>> }
>>
>> I get the following error:
>>
>>
>> java.lang.ClassCastException: class org.json.JSONObject
>>
>>         at java.lang.Class.asSubclass(Class.java:3027)
>>
>>         at
>> org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
>>
>>         at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
>>
>>         at
>> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
>>
>>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>
>>         at
>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>
>>
>>
>> It looks like it has something to do with conf.setOutputKeyClass(). Am I
>> doing something wrong here?
>>
>>
>> Thanks,
>>
>> Max Lebedev
>>
>>
>>   _______________________________
>> Mischa Tuffield PhD
>> http://mmt.me.uk/
>> @mischat
>>
>>
>>
>>
>>
>>
>

Mime
View raw message