hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xeonmailinglist-gmail <xeonmailingl...@gmail.com>
Subject can't set partition class to the configuration
Date Wed, 01 Apr 2015 14:54:22 GMT
Hi,

I have created a Mapper class[3] that filters out key values pairs that 
go to a specific partition. When I set the partition class in my code 
[1], I get the error in [2] and I don’t understand why this is 
happening. Any help to fix this?

[1]

|Configuration conf = cj.getConfiguration();
cj.setPartitionerClass(MyFilterMapper.class);
|

[2]

|The method setPartitionerClass(Class<? extends Partitioner>) in the type Job is not
applicable for the arguments (Class<JobExecution.MyFilterMapper>)
|

[3]

|public static class MyFilterMapper
     extends Mapper<Object, Text, Text, IntWritable>{

         private Text word = new Text();
         private IntWritable rvalue = new IntWritable();

         public static final String REDUCE_TASK_REEXECUTE =
                 "mapreduce.reduce.task.reexecute";
         public static final int NULL_REDUCE_TASK = -1;

         private Class<? extends Partitioner<?, ?>> partitionerClass;
         private org.apache.hadoop.mapreduce.Partitioner<Object, Text> partitionerInstance;

         public void map(Object key, Text value, Context context
                 ) throws IOException, InterruptedException {
             Configuration conf = context.getConfiguration();
             partitionerInstance = new MyHashPartitioner<Object, Text>();

             int[] task_reexecute = conf.getInts(REDUCE_TASK_REEXECUTE);
             int nr_reduce_tasks = conf.getInt("mapreduce.job.reduces", 0);
             System.out.println("Tasks reexecute: " + task_reexecute + " NRREDUCETASKS: "
+ nr_reduce_tasks);
             StringTokenizer itr = new StringTokenizer(value.toString());
             while (itr.hasMoreTokens()) {
                 String wword = itr.nextToken();
                 Integer rrvalue = Integer.valueOf(itr.nextToken());
                 int partition = partitionerInstance.getPartition(wword, value, nr_reduce_tasks);

                 if (contains(partition, task_reexecute)) {
                     System.out.println("Partition Consumed: " + partition + " - key: " +
key.toString() + " word: " + wword + " value - " + value.toString());
                     System.out.println("Partition Consumed: " + partition + " - word: " +
wword + " value - ");// + rrvalue);

                     word.set(wword);
                     rvalue.set(rrvalue);
                     context.write(word, rvalue);
                 }
             }
         }

         public boolean contains(int partition, int[] set) {
             for(int i=0; i<set.length; i++){
                 if (partition == set[i])
                     return true;
             }

             return false;
         }
     }
|

​

-- 
--
Thanks,


Mime
View raw message