hadoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lance Norskog <goks...@gmail.com>
Subject Re: Mapreduce using JSONObjects
Date Fri, 07 Jun 2013 17:16:41 GMT
A side point for Hadoop experts: a comparator is used for sorting in the 
shuffle. If a comparator always returns -1 for unequal objects, then 
sorting will take longer than it should because there will be a certain 
amount of items that are compared more than once.

Is this true?

On 06/05/2013 04:10 PM, Max Lebedev wrote:
>
> I’ve taken your advice and made a wrapper class which implements 
> WritableComparable. Thank you very much for your help. I believe 
> everything is working fine on that front. I used google’s gson for the 
> comparison.
>
>
> public int compareTo(Object o) {
>
>     JsonElement o1 = PARSER.parse(this.json.toString());
>
>     JsonElement o2 = PARSER.parse(o.toString());
>
>     if(o2.equals(o1))
>
>   return 0;
>
>     else
>
>   return -1;
>
> }
>
>
> The problem I have now is that only consecutive duplicates are 
> detected. Given 6 lines:
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>
> {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":true}
>
> {"ts":1368758947.291035,"isSecure":false,"version":2,"source":"sdk","debug":false}
>
> {"ts":1368758947.291035, 
> "source":"sdk","isSecure":false,"version":2,"debug":false}
>
> {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false}
>
>
> I get back 1, 3, 4, and 6. I should be getting 1, 3 and 4 as 6 is 
> exactly equal to 1. If I switch 5 and 6, the original line 5 is no 
> longer filtered (I get 1,3,4,5,6). I’ve noticed that the compareTo 
> method is called a total of 13 times. I assume that in order for all 6 
> of the keys to be compared, 15 comparisons need to be made. Am I 
> missing something here? I’ve tested the compareTo manually and line 1 
> and 6 are interpreted as equal. My map reduce code currently looks 
> like this:
>
>
> class DupFilter{
>
>     private static final Gson GSON = new Gson();
>
>     private static final JsonParser PARSER = new JsonParser();
>
>     public static class Map extends MapReduceBase implements 
> Mapper<LongWritable, Text, JSONWrapper, IntWritable> {
>         public void map(LongWritable key, Text value, 
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) 
> throws IOException{
>
>             JsonElement je = PARSER.parse(value.toString());
>
>             JSONWrapper jow = null;
>
>             jow = new JSONWrapper(value.toString());
>
>             IntWritable one = new IntWritable(1);
>
>             output.collect(jow, one);
>
>     }
>
>     }
>
>     public static class Reduce extends MapReduceBase implements 
> Reducer<JSONWrapper, IntWritable, JSONWrapper, IntWritable> {
>
>   public void reduce(JSONWrapper jow, Iterator<IntWritable> values, 
> OutputCollector<JSONWrapper, IntWritable> output, Reporter reporter) 
> throws IOException {
>
>             int sum = 0;
>
>             while (values.hasNext())
>
>                 sum += values.next().get();
>
>             output.collect(jow, new IntWritable(sum));
>
>       }
>
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         JobConf conf = new JobConf(DupFilter.class);
>
>   conf.setJobName("dupfilter");
>
>   conf.setOutputKeyClass(JSONWrapper.class);
>
>   conf.setOutputValueClass(IntWritable.class);
>
>   conf.setMapperClass(Map.class);
>
>   conf.setReducerClass(Reduce.class);
>
>   conf.setInputFormat(TextInputFormat.class);
>
>   conf.setOutputFormat(TextOutputFormat.class);
>
>   FileInputFormat.setInputPaths(conf, new Path(args[0]));
>
>   FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>
>   JobClient.runJob(conf);
>
>     }
>
> }
>
> Thanks,
>
> Max Lebedev
>
>
>
> On Tue, Jun 4, 2013 at 10:58 PM, Rahul Bhattacharjee 
> <rahul.rec.dgp@gmail.com <mailto:rahul.rec.dgp@gmail.com>> wrote:
>
>     I agree with Shahab , you have to ensure that the key are writable
>     comparable and values are writable in order to be used in MR.
>
>     You can have writable comparable implementation wrapping the
>     actual json object.
>
>     Thanks,
>     Rahul
>
>
>     On Wed, Jun 5, 2013 at 5:09 AM, Mischa Tuffield <mischa@mmt.me.uk
>     <mailto:mischa@mmt.me.uk>> wrote:
>
>         Hello,
>
>         On 4 Jun 2013, at 23:49, Max Lebedev <max.l@actionx.com
>         <mailto:max.l@actionx.com>> wrote:
>
>>         Hi. I've been trying to use JSONObjects to identify
>>         duplicates in JSONStrings.
>>         The duplicate strings contain the same data, but not
>>         necessarily in the same order. For example the following two
>>         lines should be identified as duplicates (and filtered).
>>
>>         {"ts":1368758947.291035,"isSecure":true,"version":2,"source":"sdk","debug":false
>>         {"ts":1368758947.291035,"version":2,"source":"sdk","isSecure":true,"debug":false}
>>
>>
>         Can you not use the timestamp as a URI and emit them as URIs.
>         Then you have your mapper emit the following kv :
>
>         output.collect(ts, value);
>
>         And you would have a straight forward reducer that can dedup
>         based on the timestamps.
>
>         If above doesn't work for you, I would look at the jackson
>         library for mangling json in java. It method of using java
>         beans for json is clean from a code pov and comes with lots of
>         nice features.
>         http://stackoverflow.com/a/2255893
>
>         P.S. In your code you are using the old'er map reduce API, I
>         would look at using the newer APIs in this
>         package org.apache.hadoop.mapreduce
>
>         Mischa
>>
>>         This is the code:
>>
>>         class DupFilter{
>>
>>                 public static class Map extends MapReduceBase
>>         implements Mapper<LongWritable, Text, JSONObject, Text> {
>>
>>                         public void map(LongWritable key, Text value,
>>         OutputCollector<JSONObject, Text> output, Reporter reporter)
>>         throws IOException{
>>
>>         JSONObject jo = null;
>>
>>                         try {
>>
>>         jo = new JSONObject(value.toString());
>>
>>                                 } catch (JSONException e) {
>>
>>               e.printStackTrace();
>>
>>         }
>>
>>         output.collect(jo, value);
>>
>>                         }
>>
>>                 }
>>
>>                 public static class Reduce extends MapReduceBase
>>         implements Reducer<JSONObject, Text, NullWritable, Text> {
>>
>>                         public void reduce(JSONObject jo,
>>         Iterator<Text> lines, OutputCollector<NullWritable, Text>
>>         output, Reporter reporter) throws IOException {
>>
>>
>>         output.collect(null, lines.next());
>>
>>                         }
>>
>>                 }
>>
>>                 public static void main(String[] args) throws
>>         Exception {
>>
>>                         JobConf conf = new JobConf(DupFilter.class);
>>
>>         conf.setOutputKeyClass(JSONObject.class);
>>
>>         conf.setOutputValueClass(Text.class);
>>
>>         conf.setMapperClass(Map.class);
>>
>>         conf.setReducerClass(Reduce.class);
>>
>>         conf.setInputFormat(TextInputFormat.class);
>>
>>         conf.setOutputFormat(TextOutputFormat.class);
>>
>>         FileInputFormat.setInputPaths(conf, new Path(args[0]));
>>
>>         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
>>
>>         JobClient.runJob(conf);
>>
>>                 }
>>
>>         }
>>
>>         I get the following error:
>>
>>         java.lang.ClassCastException: class org.json.JSONObject
>>
>>                 at java.lang.Class.asSubclass(Class.java:3027)
>>
>>                 at
>>         org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
>>
>>
>>                 at
>>         org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:817)
>>
>>
>>                 at
>>         org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:383)
>>
>>                 at
>>         org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>
>>                 at
>>         org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>
>>
>>         It looks like it has something to do with
>>         conf.setOutputKeyClass(). Am I doing something wrong here?
>>
>>
>>         Thanks,
>>
>>         Max Lebedev
>>
>
>         _______________________________
>         Mischa Tuffield PhD
>         http://mmt.me.uk/
>         @mischat
>
>
>
>
>
>
>


Mime
View raw message