hbase-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "pangxiaoxi (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HBASE-12757) MR map's input rowkey out of range of current Region
Date Thu, 25 Dec 2014 01:38:13 GMT

    [ https://issues.apache.org/jira/browse/HBASE-12757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14258616#comment-14258616
] 

pangxiaoxi commented on HBASE-12757:
------------------------------------

I have a table A, I want translate A to Table B, A & B has different rowkey ,but most
of values are same;
I write a Custom MR like ImportTSV, but my InputFormat is table A.
When I excute MR , some map() input rowkey is out of range on current Region (get it from
inputsplit ).
this mabey lost data or get unused data. 

code :
=======================================
public class RebuildTable{
	
		public final static class RebuildMapper extends TableMapper<ImmutableBytesWritable, Writable>{
		public boolean isOutputRel = true;
		public boolean isOutputData = true;
		private static byte[] DOC_FAMILY = Bytes.toBytes("doc");
		private static byte[] URL_QUALIFIER= Bytes.toBytes("url");
		private static byte[] FWDURL_QUALIFIER = Bytes.toBytes("forward_url");
		private static byte[] PKEY_QUALIFIER = Bytes.toBytes("rel_pkey");
		private static byte[] DATAKEY_QUALIFIER = Bytes.toBytes("data_key");
		private static byte[] TN_QUALIFIER = Bytes.toBytes("table_name");
		private static byte[] CURL_QUALIFIER = Bytes.toBytes("c_url");
		private Logger logger = LoggerFactory.getLogger(RebuildMapper.class);
		protected int type = -1;
		protected long count = 0;
		private HTable relTable = null;
		private String table_name = null;
		private String table_ng_name = null;
		private String location = null;
		private byte[] start_row = null;
		private byte[] end_row = null;
		
		@Override 
		protected void setup(Context context){
			type = Integer.valueOf(context.getConfiguration().get("job.hylanda.data_type"));
			//初始化一个xxxx_rel_ng表对象
			try {
				System.out.println( table_name + "=>" + table_ng_name);
				TableSplit split = (TableSplit)context.getInputSplit();
				if(split != null){
					start_row = split.getStartRow();
					end_row = split.getEndRow();
					System.out.println( split.toString());
					location = split.getRegionLocation();
					System.out.println(String.format("location=%1$s,start_row=%2$s , end_row=%3$s",
							location ,HbaseUtil.printRowkey(start_row),HbaseUtil.printRowkey(end_row)));
				}
				isOutputRel = context.getConfiguration().getBoolean("conf.output_rel", true);
				isOutputData= context.getConfiguration().getBoolean("conf.output_data", true);
				table_name = context.getConfiguration().get("conf.table_name");
				table_ng_name = context.getConfiguration().get("conf.table_ng_name");
				if(isOutputRel){
					Configuration conf = new Configuration(context.getConfiguration());
					conf.setLong("hbase.htable.threads.keepalivetime", 180);
					relTable = new HTable(conf,context.getConfiguration().get("conf.reltable_name"));
					relTable.setAutoFlush(false);
				}
			} catch (Exception e) {
				// TODO 自动生成的 catch 块
				logger.error("setup ex:"+e);
				e.printStackTrace();
			}
		}
		@Override
		protected void cleanup(Context context){
			if(relTable != null){
				try {
					relTable.flushCommits();
					relTable.close();
				} catch (IOException e) {
					// TODO 自动生成的 catch 块
					e.printStackTrace();
				}
			}
		}
		@Override
		public void map(ImmutableBytesWritable row,Result columns ,Context context) {
			try{
				byte[] rowkey = row.get();
				if(Bytes.compareTo(start_row, rowkey) > 0 || Bytes.compareTo(end_row, rowkey) <
0){ //test code
					TableSplit split = (TableSplit)context.getInputSplit();
					if(split != null){
						SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
						System.err.println(String.format("%5$s\tlocation=%1$s,start_row=%2$s ,rowkey=%3$s ,end_row=%4$s",
								split.getRegionLocation() ,HbaseUtil.printRowkey(split.getStartRow()),
								HbaseUtil.printRowkey(rowkey),HbaseUtil.printRowkey(split.getEndRow()),sdf.format(new
Date())));
					}
					return;
				}

				if(count++ % 10000 == 0) {
					logger.info("Scan="+ count + " ;rowkey=" + HbaseUtil.printRowkey(rowkey));
				}
				String url = Bytes.toString(columns.getValue(DOC_FAMILY , URL_QUALIFIER));
				long rcrc = GenUrlCrc64.GenReverseCrc64Long(url); //gen 64-bit crc 
				Bytes.putLong(rowkey, 0, rcrc);
				Put put = new Put(rowkey); //写ng表的put
				List<Put> puts = new ArrayList<Put>(); //写rel表的puts
				if(type == weibo_type  ){
					for(KeyValue kv :columns.list()){
						if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
							byte[] pkey = columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
							String pkurl = Bytes.toString(columns.getValue(DOC_FAMILY , FWDURL_QUALIFIER)); //取原串
							Bytes.putLong(pkey, 0, GenUrlCrc64.GenReverseCrc64Long(pkurl));
							put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), pkey);
							Put put_rel = new Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
							put_rel.add(DOC_FAMILY , Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
							puts.add(put_rel);
						}else{
							put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue());	
						}
					}
					
				}else if(type == ebusiness_type){
					for(KeyValue kv :columns.list()){
						if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
							String pkurl = Bytes.toString(columns.getValue(DOC_FAMILY , CURL_QUALIFIER)); //取原串
							byte[] pkey = columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
							Bytes.putLong(pkey, 0, GenUrlCrc64.GenReverseCrc64Long(pkurl));
							put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), pkey);
							//
							Put put_rel = new Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
							put_rel.add(DOC_FAMILY , Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
							puts.add(put_rel);
						}else{
							put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue());	
						}
					}
				}else{
					for(KeyValue kv :columns.list()){
						put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestamp(), kv.getValue());
					}
				}
				while(isOutputData){
					try{
						context.write(new ImmutableBytesWritable(rowkey), put);
						break;
					}catch(Exception ex){
						logger.error("context write ex:"+ex);
					}
				}
				//写rel表基本信息
				byte[] urlcrc = Bytes.tail(rowkey, 8);
				Put putRel = new Put(urlcrc);
				putRel.add(DOC_FAMILY , DATAKEY_QUALIFIER, rowkey);
				putRel.add(DOC_FAMILY , TN_QUALIFIER, Bytes.toBytes(table_ng_name));
				puts.add(putRel);
				while(isOutputRel && relTable != null){
					try{
						relTable.put(puts);
						break;
					}catch(Exception ex){
						logger.error("put ex:"+ex.toString());
					}
				}
				context.getCounter("Rebuild","success").increment(1);
			}catch(Exception ex){
				System.err.println("Err:"+ex +",row:" + Bytes.toStringBinary(row.get()));
				context.getCounter("Rebuild","failed").increment(1);
			}
		}
	}

		public static void main(String[] argv){
			
			String hdfsip = "10.0.5.34";
			String zkIps = "10.0.5.34";
			Configuration conf = new Configuration();
			System.setProperty("HADOOP_USER_NAME", "hadoop");
			Configuration hbaseConfiguration = HBaseConfiguration.create(conf);
			hbaseConfiguration.set("mapred.job.priority", JobPriority.HIGH.name());
			hbaseConfiguration.set("fs.default.name", "hdfs://" + hdfsip + ":9000");
			hbaseConfiguration.set("mapred.job.tracker", hdfsip + ":9001");
			hbaseConfiguration.set("hbase.zookeeper.quorum", zkIps);
			hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181");
			hbaseConfiguration.set("mapred.reduce.tasks.speculative.execution", "false");
			hbaseConfiguration.set("mapred.map.tasks.speculative.execution", "false");
			hbaseConfiguration.set("mapred.job.queue.name", "default");
			hbaseConfiguration.set("mapred.child.java.opts","-Xmx1024m");
			hbaseConfiguration.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 180000);  
			hbaseConfiguration.setLong("dfs.socket.timeout", 180000);
			hbaseConfiguration.setLong("hbase.htable.threads.keepalivetime",180);
			
			
			String tablename = "news_201411";
			String tablename_ng = "news_201411_ng";
			String tablename_rel_ng = "news_rel_ng";
			HbaseUtil.createRelTable(hbaseConfiguration, relngName); //create another table
			
			hbaseConfiguration.set("conf.table_name", tablename);
			hbaseConfiguration.set("conf.table_ng_name", tablename_ng);
			hbaseConfiguration.set("conf.reltable_name", tablename_rel_ng);
			
			
			Job job = new Job(hbaseConfiguration,"job_rebuilder_"+tablename);
			job.setJarByClass(RebuildMapper.class);
		
			List<Scan> scans = new ArrayList<Scan>();
			Scan scan = new Scan();
			scan.setCacheBlocks(false);
			scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tablename));
			scan.setCaching(100);
			scans.add(scan);
			TableMapReduceUtil.initTableMapperJob(scans, RebuildMapper.class,ImmutableBytesWritable.class
,Put.class,job);
			job.setReducerClass(PutSortReducer.class);
			String hfileOutPath = "/user/hadoop/"+tablename_ng ;
			Path outputDir = new Path(hfileOutPath);
	    FileOutputFormat.setOutputPath(job, outputDir);
	    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
	    job.setMapOutputValueClass(Put.class);
	    
	    HTable table = new HTable(conf,tablename);
	    HFileOutputFormat.configureIncrementalLoad(job, table);

			System.out.println("set job begin,'" + tablename + "' => '" + tablename_ng + "'");
			boolean bCompleted = job.waitForCompletion(true);

		}		
		
}

> MR map's input rowkey out of range of current Region 
> -----------------------------------------------------
>
>                 Key: HBASE-12757
>                 URL: https://issues.apache.org/jira/browse/HBASE-12757
>             Project: HBase
>          Issue Type: Bug
>          Components: Client, hbase
>    Affects Versions: 0.94.7
>         Environment: hadoop 1.1.2, r1440782
> hbase 0.94.7
> linux  2.6.32-279.el6.x86_64
>            Reporter: pangxiaoxi
>            Priority: Critical
>
> I excute mapreduce scan all table, sometimes map input value of rowkey is out of range
on current Region (get from inputsplit ).
> this mabey  lost data or get unused data.
> ps. I want  to use ImportTSV translate table.....
> eg. 
> location=datanode11,start_row=D9CB114FD09A82A3_0000000000000000_m_43DAAA689D4AFC86 ,rowkey=D323E1D0A51E5185_0000000000000000_m_75686B8924108044
,end_row=DB0C4FC44E6D80C1_0000000000000000_m_E956CC65322BA3E5



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message