Hello everyone, I wrote a mapreduce program to import data(HDFS) into
hbase, but when I import data into hbase later increased a lot, my original
data size is 69MB (HDFS), import HBase, My HDFS increase the size 3GB, I
wrote the program do what is wrong
thanks
public class MRImportHBaseCsv {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hydra0001:8020");
conf.set("yarn.resourcemanager.address", "hydra0001:8032");
Job job = createSubmitTableJob(conf, args);
job.submit();
}
public static Job createSubmitTableJob(Configuration conf, String[]
args)
throws IOException {
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, "HDFS_TO_HBase");
job.setJarByClass(HourlyImporter.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(HourlyImporter.class);
// ++++ insert into table directly using TableOutputFormat ++++
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
TableMapReduceUtil.addDependencyJars(job);
return job;
}
static class HourlyImporter extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private long ts;
// var column family
static byte[] family = Bytes.toBytes("s");
static String columns = "HBASE_ROW_KEY,STATION,YEAR,MONTH,DAY,HOUR,MINUTE";
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
ts = System.currentTimeMillis();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
ArrayList<String> columnsList = Lists.newArrayList(Splitter.on(',')
.trimResults().split(columns));
String line = value.toString();
ArrayList<String> columnValues = Lists.newArrayList(Splitter
.on(',').trimResults().split(line));
byte[] bRowKey = Bytes.toBytes(columnValues.get(0));
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(bRowKey);
Put p = new Put(Bytes.toBytes(columnValues.get(0)));
for (int i = 1; i < columnValues.size(); i++) {
p.add(family, Bytes.toBytes(columnsList.get(i)),
Bytes.toBytes(columnValues.get(i)));
}
context.write(rowKey, p);
}
}
}
--
In the Hadoop world, I am just a novice, explore the entire Hadoop
ecosystem, I hope one day I can contribute their own code
YanBit
yankunhadoop@gmail.com
|