hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Max Lebedev <ma...@actionx.com>
Subject Re: Mapreduce using JSONObjects
Date Fri, 07 Jun 2013 19:49:41 GMT
Hi again.

I am attempting to compare the strings as JSON objects using hashcodes with
the ultimate goal to remove duplicates.

I've have implemented the following solution.

1. I parse the input line into a JsonElement using the Google JSON parser
(Gson),

2. I take the hash code of the resulting JSONElement. And I use it as the
Key for <Key,Val> output pairs. It seems to work fine.

As I am new to hadoop, I just want to run this by the community. Is there
some reason this wouldn't work?

Thank you very much for your help

For reference, here is my code:

class DupFilter{

   private static final JsonParser PARSER = new JsonParser();

   public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, IntWritable, Text> {

       public void map(LongWritable key, Text value,
OutputCollector<IntWritable, Text> output, Reporter reporter) throws
IOException{

           if(value.equals(null) || value.getLength() == 0)

               return;

           JsonElement je = PARSER.parse(value.toString());

           int hash = je.hashCode();

           output.collect(new IntWritable(hash), value);

       }

   }

   public static class Reduce extends MapReduceBase implements
Reducer<IntWritable, Text, IntWritable, Text> {

       public void reduce(IntWritable key, Iterator<Text> values,
OutputCollector<IntWritable, Text> output, Reporter reporter) throws
IOException {

           output.collect(key, values.next());

       }

   }


   public static void main(String[] args) throws Exception {

       JobConf conf = new JobConf(DupFilter.class);

       conf.setOutputKeyClass(IntWritable.class);

       conf.setOutputValueClass(Text.class);

       conf.setMapperClass(Map.class);

       conf.setReducerClass(Reduce.class);

       conf.setInputFormat(TextInputFormat.class);

       conf.setOutputFormat(TextOutputFormat.class);

       FileInputFormat.setInputPaths(conf, new Path(args[0]));

       FileOutputFormat.setOutputPath(conf, new Path(args[1]));

       JobClient.runJob(conf);

   }

}


On Fri, Jun 7, 2013 at 1:16 PM, Lance Norskog <goksron@gmail.com> wrote:

>  A side point for Hadoop experts: a comparator is used for sorting in the
> shuffle. If a comparator always returns -1 for unequal objects, then
> sorting will take longer than it should because there will be a certain
> amount of items that are compared more than once.
>
> Is this true?
>
> On 06/05/2013 04:10 PM, Max Lebedev wrote:
>
>  I’ve taken your advice and made a wrapper class which implements
> WritableComparable. Thank you very much for your help. I believe everything
> is working fine on that front. I used google’s gson for the comparison.
>
>
>  public int compareTo(Object o) {
>
>     JsonElement o1 = PARSER.parse(this.json.toString());
>
>     JsonElement o2 = PARSER.parse(o.toString());
>
>     if(o2.equals(o1))
>
>         return 0;
>
>     else
>
>         return -1;
>
> }
>
>
>  The problem I have now is that only consecutive duplicates are detected.
> Given 6 lines:
>
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}
>
>
> {"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}
>
> {"ts":1368758947.291035,
> "source":"sdk","isSecure":false,"version":2,"debug":false}
>
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
>
>  I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is
> exactly equal to 1. If I switch 5 and 6, the original line 5 is no longer
> filtered (I get 1,3,4,5,6). I’ve noticed that the compareTo method is
> called a total of 13 times. I assume that in order for all 6 of the keys to
> be compared, 15 comparisons need to be made. Am I missing something here?
> I’ve tested the compareTo manually and line 1 and 6 are interpreted as
> equal. My map reduce code currently looks like this:
>
>
>  class DupFilter{
>
>     private static final Gson GSON = new Gson();
>
>     private static final JsonParser PARSER = new JsonParser();
>
>     public static class Map extends MapReduceBase implements
> Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
>         public void map(LongWritable key, Text value,
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
> IOException{
>
>             JsonElement je = PARSER.parse(value.toString());
>
>             JSONWrapper jow = null;
>
>             jow = new JSONWrapper(value.toString());
>
>             IntWritable one = new IntWritable(1);
>
>             output.collect(jow, one);
>
>             }
>
>     }
>
>     public static class Reduce extends MapReduceBase implements
> Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {
>
>         public void reduce(JSONWrapper jow, Iterator<IntWritable> values,
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) throws
> IOException {
>
>             int sum = 0;
>
>             while (values.hasNext())
>
>                 sum += values.next().get();
>
>             output.collect(jow, new IntWritable(sum));
>
>             }
>
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         JobConf conf = new JobConf(DupFilter.class);
>
>         conf.setJobName("dupfilter");
>
>         conf.setOutputKeyClass(JSONWrapper.class);
>
>         conf.setOutputValueClass(IntWritable.class);
>
>         conf.setMapperClass(Map.class);
>
>         conf.setReducerClass(Reduce.class);
>
>         conf.setInputFormat(TextInputFormat.class);
>
>         conf.setOutputFormat(TextOutputFormat.class);
>
>         FileInputFormat.setInputPaths(conf, new Path(args[0]));
>
>         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>
>         JobClient.runJob(conf);
>
>     }
>
> }
>
> Thanks,
>
>  Max Lebedev
>
>
> On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee <
> rahul.rec.dgp@gmail.com> wrote:
>
>>  I agree with Shahab , you have to ensure that the key are writable
>> comparable and values are writable in order to be used in MR.
>>
>>  You can have writable comparable implementation wrapping the actual
>> json object.
>>
>>  Thanks,
>> Rahul
>>
>>
>> On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <mischa@mmt.me.uk> wrote:
>>
>>> Hello,
>>>
>>>   On 4 Jun 2013, at 23:49, Max Lebedev <max.l@actionx.com> wrote:
>>>
>>>  Hi. I've been trying to use JSONObjects to identify duplicates in
>>> JSONStrings.
>>> The duplicate strings contain the same data, but not necessarily in the
>>> same order. For example the following two lines should be identified as
>>> duplicates (and filtered).
>>>
>>>
>>> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
>>>
>>> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>>>
>>>  Can you not use the timestamp as a URI and emit them as URIs. Then you
>>> have your mapper emit the following kv :
>>>
>>>  output.collect(ts, value);
>>>
>>>  And you would have a straight forward reducer that can dedup based on
>>> the timestamps.
>>>
>>>  If above doesn't work for you, I would look at the jackson library for
>>> mangling json in java. It method of using java beans for json is clean from
>>> a code pov and comes with lots of nice features.
>>> http://stackoverflow.com/a/2255893
>>>
>>>  P.S. In your code you are using the old'er map reduce API, I would
>>> look at using the newer APIs in this package org.apache.hadoop.mapreduce
>>>
>>>  Mischa
>>>
>>>  This is the code:
>>>
>>> class DupFilter{
>>>
>>>         public static class Map extends MapReduceBase implements
>>> Mapper<LongWritable, Text, JSONObject, Text> {
>>>
>>>                 public void map(LongWritable key, Text value,
>>> OutputCollector<JSONObject, Text> output, Reporter reporter) throws
>>> IOException{
>>>
>>>                 JSONObject jo = null;
>>>
>>>                 try {
>>>
>>>                         jo = new JSONObject(value.toString());
>>>
>>>                         } catch (JSONException e) {
>>>
>>>                                 e.printStackTrace();
>>>
>>>                         }
>>>
>>>                 output.collect(jo, value);
>>>
>>>                 }
>>>
>>>         }
>>>
>>>         public static class Reduce extends MapReduceBase implements
>>> Reducer<JSONObject, Text, NullWritable, Text> {
>>>
>>>                 public void reduce(JSONObject jo, Iterator<Text> lines,
>>> OutputCollector<NullWritable, Text> output, Reporter reporter) throws
>>> IOException {
>>>
>>>                          output.collect(null, lines.next());
>>>
>>>                 }
>>>
>>>         }
>>>
>>>         public static void main(String[] args) throws Exception {
>>>
>>>                 JobConf conf = new JobConf(DupFilter.class);
>>>
>>>                 conf.setOutputKeyClass(JSONObject.class);
>>>
>>>                 conf.setOutputValueClass(Text.class);
>>>
>>>                 conf.setMapperClass(Map.class);
>>>
>>>                 conf.setReducerClass(Reduce.class);
>>>
>>>                 conf.setInputFormat(TextInputFormat.class);
>>>
>>>                 conf.setOutputFormat(TextOutputFormat.class);
>>>
>>>                 FileInputFormat.setInputPaths(conf, new Path(args[0]));
>>>
>>>                 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>>
>>>                 JobClient.runJob(conf);
>>>
>>>         }
>>>
>>> }
>>>
>>> I get the following error:
>>>
>>>
>>> java.lang.ClassCastException: class org.json.JSONObject
>>>
>>>         at java.lang.Class.asSubclass(Class.java:3027)
>>>
>>>         at
>>> org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
>>>
>>>         at
>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
>>>
>>>         at
>>> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
>>>
>>>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>>
>>>         at
>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>>
>>>
>>>
>>>  It looks like it has something to do with conf.setOutputKeyClass(). Am
>>> I doing something wrong here?
>>>
>>>
>>>  Thanks,
>>>
>>> Max Lebedev
>>>
>>>
>>>   _______________________________
>>> Mischa Tuffield PhD
>>> http://mmt.me.uk/
>>> @mischat
>>>
>>>
>>>
>>>
>>>
>>>
>>
>
>

Mime
View raw message