hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "he yongqiang (JIRA)" <j...@apache.org>
Subject [jira] Commented: (HADOOP-5368) more user control on customized RecordReader
Date Thu, 05 Mar 2009 01:25:56 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12679029#action_12679029

he yongqiang commented on HADOOP-5368:

Yeah, that is what i did. 
In 0.19, another way to work this out is to use the FileSplit as the super class of the customized
InputSplit, because in MapTask of 0.19, there is a piece of code :
if (instantiatedSplit instanceof FileSplit) {
      FileSplit fileSplit = (FileSplit) instantiatedSplit;
      job.set("map.input.file", fileSplit.getPath().toString());
      job.setLong("map.input.start", fileSplit.getStart());
      job.setLong("map.input.length", fileSplit.getLength());

But i am not sure whether it can apply to in future, since in the trunk code, it has been
moved to runOldMapper ->updateJobWithSplit. It seems in the future, this will be deprecated.

Maybe we can work this in another way, give more control to the programmer by letting them
pass info through RecordReader.  
Another benefit of this is the user may input mappers as not k-v pairs. For example, we compress
many records as a single block(not hdfs block), we can input a whole block to the mapper and
output some kv pairs. At the shuffer phase's view, it is the same since the mapper outputs
kv pairs.

> more user control on customized RecordReader
> --------------------------------------------
>                 Key: HADOOP-5368
>                 URL: https://issues.apache.org/jira/browse/HADOOP-5368
>             Project: Hadoop Core
>          Issue Type: Wish
>            Reporter: he yongqiang
> Currently user can define own InputFormat and RecordReader, but the user has little control
on them. 
> An example, we input mutiple files into the mapper and want to handle them in different
ways depending on which file this mapper is working.
> This can be easily done as follows:
> {code}
>       public class BlockMapRunner implements MapRunnable {
> 	private BlockMapper mapper;
> 	@Override
> 	public void run(RecordReader input, OutputCollector output,
> 			Reporter reporter) throws IOException {
> 		if (mapper == null)
> 			return;
> 		BlockReader blkReader = (BlockReader) input;
> 		this.mapper.initialize(input);
> 		...........
> 	}
> 	@Override
> 	public void configure(JobConf job) {
> 		JobConf work = new JobConf(job);
> 		Class<? extends BlockMapper> mapCls = work.getBlockMapperClass();
> 		if (mapCls != null) {
> 			this.mapper = (BlockMapper) ReflectionUtils
> 					.newInstance(mapCls, job);
> 		}
> 	}
> }
> /*
> BlockMapper implements the Mapper and is initialized from RecordReader, from which we
get which file this mapper is working on and find the right strategy for it.
> */
> public class ExtendedMapper extends BlockMapper {
> 	private Strategy strategy;
> 	private Configuration work;
> 	@Override
> 	public void configure(Configuration job) {
> 		this.work = job;
> 	}
> 	@Override
> 	public void initialize(RecordReader reader) throws IOException {
> 		String path = ((UserDefinedRecordReader) reader).which_File_We_Are_Working_On();  
//((UserDefinedRecordReader) reader) is wrong!
> 		this.strategy = this.work.getStrategy(path);
> 	}
> 	@Override
> 	public void map(Key k, V value, OutputCollector output, Reporter reporter)
> 			throws IOException {
> 		strategy.handle(k,v);
> 	}
> }
> {code}
> {color:red}
> However, the above code does not work. The reader passed into mapper is wrapped by MapTask,
and is either SkippingRecordReader or TrackedRecordReader. We can not cast it back and we
can not pass any information through the user defined RecordReader. If the SkippingRecordReader
and TrackedRecordReader have a method for getting the raw reader, it will not have this problem.
> {color:}
> This problem could be resolved by initiating many map-reduce jobs,one job for each file.
But this apparently is what we want.
> Or there exist other solutions? 
> Appreciated for any comments.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message