hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "rulinma (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (MAPREDUCE-5433) use mapreduce to parse hfiles and output keyvalue
Date Wed, 07 Aug 2013 02:40:48 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13731586#comment-13731586
] 

rulinma commented on MAPREDUCE-5433:
------------------------------------

parse file to table:

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.hbase.client.Result;


public class HFileMapperTable {

	public static class MyMap extends
			Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put> {
		public static Counter ct = null;

		public void map(ImmutableBytesWritable key, Result value,
				Context context) throws IOException, InterruptedException {
	

			Put put = new Put(key.copyBytes());
			List<KeyValue> kvList = value.list();
			for (KeyValue kv : kvList) {
			put.add(kv);
			}
		
			context.write(key, put);
			ct = context.getCounter("rowCount", "totalRow");
			ct.increment(1);
		}

		public void setup(Context context) {
		
		}
	}

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
	
		Configuration conf = new Configuration();
		Job job = new Job(conf, "HFileMapperTable2");
		
		job.setJarByClass(HFileMapperTableTwo.class);
		job.setMapperClass(MyMap.class);
		job.setInputFormatClass(HFileInputFormatTwo.class);
	
		FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
		List<FileStatus> result = new ArrayList<FileStatus>();
		addInputPathRecursively(result, fs, new Path(args[0]));
		String inputPath = "";

		for (FileStatus f : result) {
			inputPath = f.getPath() + "," + inputPath;
		}
		if (inputPath.length() > 0) {
			inputPath = inputPath.substring(0, inputPath.length() - 1);

		}
	
		HFileInputFormatTwo.addInputPaths(job, inputPath);
	

		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);

		job.setNumReduceTasks(0);
		TableMapReduceUtil.initTableReducerJob(args[1], null, job);
	

		job.waitForCompletion(true);
		System.out.println("hfile parsed.");
	}

	public static void addInputPathRecursively(List<FileStatus> result,
			FileSystem fs, Path path) throws IOException {
		for (FileStatus stat : fs.listStatus(path)) {
			if (stat.isDirectory()) {
				addInputPathRecursively(result, fs, stat.getPath());
			} else {
				result.add(stat);
			}
		}
	}
}

                
> use mapreduce to parse hfiles and output keyvalue
> -------------------------------------------------
>
>                 Key: MAPREDUCE-5433
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5433
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: examples
>            Reporter: rulinma
>            Assignee: rulinma
>            Priority: Minor
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message