Return-Path: X-Original-To: apmail-hbase-issues-archive@www.apache.org Delivered-To: apmail-hbase-issues-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 167FCC1B7 for ; Thu, 25 Dec 2014 01:38:16 +0000 (UTC) Received: (qmail 89269 invoked by uid 500); 25 Dec 2014 01:38:13 -0000 Delivered-To: apmail-hbase-issues-archive@hbase.apache.org Received: (qmail 89218 invoked by uid 500); 25 Dec 2014 01:38:13 -0000 Mailing-List: contact issues-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@hbase.apache.org Received: (qmail 89207 invoked by uid 99); 25 Dec 2014 01:38:13 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Dec 2014 01:38:13 +0000 Date: Thu, 25 Dec 2014 01:38:13 +0000 (UTC) From: "pangxiaoxi (JIRA)" To: issues@hbase.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HBASE-12757) MR map's input rowkey out of range of current Region MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HBASE-12757?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D142= 58616#comment-14258616 ]=20 pangxiaoxi commented on HBASE-12757: ------------------------------------ I have a table A, I want translate A to Table B, A & B has different rowkey= ,but most of values are same; I write a Custom MR like ImportTSV, but my InputFormat is table A. When I excute MR , some map() input rowkey is out of range on current Regio= n (get it from inputsplit ). this mabey lost data or get unused data.=20 code : =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D public class RebuildTable{ =09 =09=09public final static class RebuildMapper extends TableMapper{ =09=09public boolean isOutputRel =3D true; =09=09public boolean isOutputData =3D true; =09=09private static byte[] DOC_FAMILY =3D Bytes.toBytes("doc"); =09=09private static byte[] URL_QUALIFIER=3D Bytes.toBytes("url"); =09=09private static byte[] FWDURL_QUALIFIER =3D Bytes.toBytes("forward_url= "); =09=09private static byte[] PKEY_QUALIFIER =3D Bytes.toBytes("rel_pkey"); =09=09private static byte[] DATAKEY_QUALIFIER =3D Bytes.toBytes("data_key")= ; =09=09private static byte[] TN_QUALIFIER =3D Bytes.toBytes("table_name"); =09=09private static byte[] CURL_QUALIFIER =3D Bytes.toBytes("c_url"); =09=09private Logger logger =3D LoggerFactory.getLogger(RebuildMapper.class= ); =09=09protected int type =3D -1; =09=09protected long count =3D 0; =09=09private HTable relTable =3D null; =09=09private String table_name =3D null; =09=09private String table_ng_name =3D null; =09=09private String location =3D null; =09=09private byte[] start_row =3D null; =09=09private byte[] end_row =3D null; =09=09 =09=09@Override=20 =09=09protected void setup(Context context){ =09=09=09type =3D Integer.valueOf(context.getConfiguration().get("job.hylan= da.data_type")); =09=09=09//=E5=88=9D=E5=A7=8B=E5=8C=96=E4=B8=80=E4=B8=AAxxxx_rel_ng=E8=A1= =A8=E5=AF=B9=E8=B1=A1 =09=09=09try { =09=09=09=09System.out.println( table_name + "=3D>" + table_ng_name); =09=09=09=09TableSplit split =3D (TableSplit)context.getInputSplit(); =09=09=09=09if(split !=3D null){ =09=09=09=09=09start_row =3D split.getStartRow(); =09=09=09=09=09end_row =3D split.getEndRow(); =09=09=09=09=09System.out.println( split.toString()); =09=09=09=09=09location =3D split.getRegionLocation(); =09=09=09=09=09System.out.println(String.format("location=3D%1$s,start_row= =3D%2$s , end_row=3D%3$s", =09=09=09=09=09=09=09location ,HbaseUtil.printRowkey(start_row),HbaseUtil.p= rintRowkey(end_row))); =09=09=09=09} =09=09=09=09isOutputRel =3D context.getConfiguration().getBoolean("conf.out= put_rel", true); =09=09=09=09isOutputData=3D context.getConfiguration().getBoolean("conf.out= put_data", true); =09=09=09=09table_name =3D context.getConfiguration().get("conf.table_name"= ); =09=09=09=09table_ng_name =3D context.getConfiguration().get("conf.table_ng= _name"); =09=09=09=09if(isOutputRel){ =09=09=09=09=09Configuration conf =3D new Configuration(context.getConfigur= ation()); =09=09=09=09=09conf.setLong("hbase.htable.threads.keepalivetime", 180); =09=09=09=09=09relTable =3D new HTable(conf,context.getConfiguration().get(= "conf.reltable_name")); =09=09=09=09=09relTable.setAutoFlush(false); =09=09=09=09} =09=09=09} catch (Exception e) { =09=09=09=09// TODO =E8=87=AA=E5=8A=A8=E7=94=9F=E6=88=90=E7=9A=84 catch =E5= =9D=97 =09=09=09=09logger.error("setup ex:"+e); =09=09=09=09e.printStackTrace(); =09=09=09} =09=09} =09=09@Override =09=09protected void cleanup(Context context){ =09=09=09if(relTable !=3D null){ =09=09=09=09try { =09=09=09=09=09relTable.flushCommits(); =09=09=09=09=09relTable.close(); =09=09=09=09} catch (IOException e) { =09=09=09=09=09// TODO =E8=87=AA=E5=8A=A8=E7=94=9F=E6=88=90=E7=9A=84 catch = =E5=9D=97 =09=09=09=09=09e.printStackTrace(); =09=09=09=09} =09=09=09} =09=09} =09=09@Override =09=09public void map(ImmutableBytesWritable row,Result columns ,Context co= ntext) { =09=09=09try{ =09=09=09=09byte[] rowkey =3D row.get(); =09=09=09=09if(Bytes.compareTo(start_row, rowkey) > 0 || Bytes.compareTo(en= d_row, rowkey) < 0){ //test code =09=09=09=09=09TableSplit split =3D (TableSplit)context.getInputSplit(); =09=09=09=09=09if(split !=3D null){ =09=09=09=09=09=09SimpleDateFormat sdf =3D new SimpleDateFormat("yyyy-MM-dd= HH:mm:ss"); =09=09=09=09=09=09System.err.println(String.format("%5$s\tlocation=3D%1$s,s= tart_row=3D%2$s ,rowkey=3D%3$s ,end_row=3D%4$s", =09=09=09=09=09=09=09=09split.getRegionLocation() ,HbaseUtil.printRowkey(sp= lit.getStartRow()), =09=09=09=09=09=09=09=09HbaseUtil.printRowkey(rowkey),HbaseUtil.printRowkey= (split.getEndRow()),sdf.format(new Date()))); =09=09=09=09=09} =09=09=09=09=09return; =09=09=09=09} =09=09=09=09if(count++ % 10000 =3D=3D 0) { =09=09=09=09=09logger.info("Scan=3D"+ count + " ;rowkey=3D" + HbaseUtil.pri= ntRowkey(rowkey)); =09=09=09=09} =09=09=09=09String url =3D Bytes.toString(columns.getValue(DOC_FAMILY , URL= _QUALIFIER)); =09=09=09=09long rcrc =3D GenUrlCrc64.GenReverseCrc64Long(url); //gen 64-bi= t crc=20 =09=09=09=09Bytes.putLong(rowkey, 0, rcrc); =09=09=09=09Put put =3D new Put(rowkey); //=E5=86=99ng=E8=A1=A8=E7=9A=84put =09=09=09=09List puts =3D new ArrayList(); //=E5=86=99rel=E8=A1= =A8=E7=9A=84puts =09=09=09=09if(type =3D=3D weibo_type ){ =09=09=09=09=09for(KeyValue kv :columns.list()){ =09=09=09=09=09=09if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){ =09=09=09=09=09=09=09byte[] pkey =3D columns.getValue(DOC_FAMILY , PKEY_QUA= LIFIER); =09=09=09=09=09=09=09String pkurl =3D Bytes.toString(columns.getValue(DOC_F= AMILY , FWDURL_QUALIFIER)); //=E5=8F=96=E5=8E=9F=E4=B8=B2 =09=09=09=09=09=09=09Bytes.putLong(pkey, 0, GenUrlCrc64.GenReverseCrc64Long= (pkurl)); =09=09=09=09=09=09=09put.add(kv.getFamily(), kv.getQualifier(), kv.getTimes= tamp(), pkey); =09=09=09=09=09=09=09Put put_rel =3D new Put(Bytes.toBytes(GenUrlCrc64.GenC= rc64Long(pkurl))); =09=09=09=09=09=09=09put_rel.add(DOC_FAMILY , Bytes.add(Bytes.toBytes("rel_= "), rowkey),Bytes.toBytes(table_ng_name)); =09=09=09=09=09=09=09puts.add(put_rel); =09=09=09=09=09=09}else{ =09=09=09=09=09=09=09put.add(kv.getFamily(), kv.getQualifier(), kv.getTimes= tamp(), kv.getValue());=09 =09=09=09=09=09=09} =09=09=09=09=09} =09=09=09=09=09 =09=09=09=09}else if(type =3D=3D ebusiness_type){ =09=09=09=09=09for(KeyValue kv :columns.list()){ =09=09=09=09=09=09if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){ =09=09=09=09=09=09=09String pkurl =3D Bytes.toString(columns.getValue(DOC_F= AMILY , CURL_QUALIFIER)); //=E5=8F=96=E5=8E=9F=E4=B8=B2 =09=09=09=09=09=09=09byte[] pkey =3D columns.getValue(DOC_FAMILY , PKEY_QUA= LIFIER); =09=09=09=09=09=09=09Bytes.putLong(pkey, 0, GenUrlCrc64.GenReverseCrc64Long= (pkurl)); =09=09=09=09=09=09=09put.add(kv.getFamily(), kv.getQualifier(), kv.getTimes= tamp(), pkey); =09=09=09=09=09=09=09// =09=09=09=09=09=09=09Put put_rel =3D new Put(Bytes.toBytes(GenUrlCrc64.GenC= rc64Long(pkurl))); =09=09=09=09=09=09=09put_rel.add(DOC_FAMILY , Bytes.add(Bytes.toBytes("rel_= "), rowkey),Bytes.toBytes(table_ng_name)); =09=09=09=09=09=09=09puts.add(put_rel); =09=09=09=09=09=09}else{ =09=09=09=09=09=09=09put.add(kv.getFamily(), kv.getQualifier(), kv.getTimes= tamp(), kv.getValue());=09 =09=09=09=09=09=09} =09=09=09=09=09} =09=09=09=09}else{ =09=09=09=09=09for(KeyValue kv :columns.list()){ =09=09=09=09=09=09put.add(kv.getFamily(), kv.getQualifier(), kv.getTimestam= p(), kv.getValue()); =09=09=09=09=09} =09=09=09=09} =09=09=09=09while(isOutputData){ =09=09=09=09=09try{ =09=09=09=09=09=09context.write(new ImmutableBytesWritable(rowkey), put); =09=09=09=09=09=09break; =09=09=09=09=09}catch(Exception ex){ =09=09=09=09=09=09logger.error("context write ex:"+ex); =09=09=09=09=09} =09=09=09=09} =09=09=09=09//=E5=86=99rel=E8=A1=A8=E5=9F=BA=E6=9C=AC=E4=BF=A1=E6=81=AF =09=09=09=09byte[] urlcrc =3D Bytes.tail(rowkey, 8); =09=09=09=09Put putRel =3D new Put(urlcrc); =09=09=09=09putRel.add(DOC_FAMILY , DATAKEY_QUALIFIER, rowkey); =09=09=09=09putRel.add(DOC_FAMILY , TN_QUALIFIER, Bytes.toBytes(table_ng_na= me)); =09=09=09=09puts.add(putRel); =09=09=09=09while(isOutputRel && relTable !=3D null){ =09=09=09=09=09try{ =09=09=09=09=09=09relTable.put(puts); =09=09=09=09=09=09break; =09=09=09=09=09}catch(Exception ex){ =09=09=09=09=09=09logger.error("put ex:"+ex.toString()); =09=09=09=09=09} =09=09=09=09} =09=09=09=09context.getCounter("Rebuild","success").increment(1); =09=09=09}catch(Exception ex){ =09=09=09=09System.err.println("Err:"+ex +",row:" + Bytes.toStringBinary(ro= w.get())); =09=09=09=09context.getCounter("Rebuild","failed").increment(1); =09=09=09} =09=09} =09} =09=09public static void main(String[] argv){ =09=09=09 =09=09=09String hdfsip =3D "10.0.5.34"; =09=09=09String zkIps =3D "10.0.5.34"; =09=09=09Configuration conf =3D new Configuration(); =09=09=09System.setProperty("HADOOP_USER_NAME", "hadoop"); =09=09=09Configuration hbaseConfiguration =3D HBaseConfiguration.create(con= f); =09=09=09hbaseConfiguration.set("mapred.job.priority", JobPriority.HIGH.nam= e()); =09=09=09hbaseConfiguration.set("fs.default.name", "hdfs://" + hdfsip + ":9= 000"); =09=09=09hbaseConfiguration.set("mapred.job.tracker", hdfsip + ":9001"); =09=09=09hbaseConfiguration.set("hbase.zookeeper.quorum", zkIps); =09=09=09hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "218= 1"); =09=09=09hbaseConfiguration.set("mapred.reduce.tasks.speculative.execution"= , "false"); =09=09=09hbaseConfiguration.set("mapred.map.tasks.speculative.execution", "= false"); =09=09=09hbaseConfiguration.set("mapred.job.queue.name", "default"); =09=09=09hbaseConfiguration.set("mapred.child.java.opts","-Xmx1024m"); =09=09=09hbaseConfiguration.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PER= IOD_KEY, 180000); =20 =09=09=09hbaseConfiguration.setLong("dfs.socket.timeout", 180000); =09=09=09hbaseConfiguration.setLong("hbase.htable.threads.keepalivetime",18= 0); =09=09=09 =09=09=09 =09=09=09String tablename =3D "news_201411"; =09=09=09String tablename_ng =3D "news_201411_ng"; =09=09=09String tablename_rel_ng =3D "news_rel_ng"; =09=09=09HbaseUtil.createRelTable(hbaseConfiguration, relngName); //create = another table =09=09=09 =09=09=09hbaseConfiguration.set("conf.table_name", tablename); =09=09=09hbaseConfiguration.set("conf.table_ng_name", tablename_ng); =09=09=09hbaseConfiguration.set("conf.reltable_name", tablename_rel_ng); =09=09=09 =09=09=09 =09=09=09Job job =3D new Job(hbaseConfiguration,"job_rebuilder_"+tablename)= ; =09=09=09job.setJarByClass(RebuildMapper.class); =09=09 =09=09=09List scans =3D new ArrayList(); =09=09=09Scan scan =3D new Scan(); =09=09=09scan.setCacheBlocks(false); =09=09=09scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(t= ablename)); =09=09=09scan.setCaching(100); =09=09=09scans.add(scan); =09=09=09TableMapReduceUtil.initTableMapperJob(scans, RebuildMapper.class,I= mmutableBytesWritable.class ,Put.class,job); =09=09=09job.setReducerClass(PutSortReducer.class); =09=09=09String hfileOutPath =3D "/user/hadoop/"+tablename_ng ; =09=09=09Path outputDir =3D new Path(hfileOutPath); =09 FileOutputFormat.setOutputPath(job, outputDir); =09 job.setMapOutputKeyClass(ImmutableBytesWritable.class); =09 job.setMapOutputValueClass(Put.class); =09 =20 =09 HTable table =3D new HTable(conf,tablename); =09 HFileOutputFormat.configureIncrementalLoad(job, table); =09=09=09System.out.println("set job begin,'" + tablename + "' =3D> '" + ta= blename_ng + "'"); =09=09=09boolean bCompleted =3D job.waitForCompletion(true); =09=09}=09=09 =09=09 } > MR map's input rowkey out of range of current Region=20 > ----------------------------------------------------- > > Key: HBASE-12757 > URL: https://issues.apache.org/jira/browse/HBASE-12757 > Project: HBase > Issue Type: Bug > Components: Client, hbase > Affects Versions: 0.94.7 > Environment: hadoop 1.1.2, r1440782 > hbase 0.94.7 > linux 2.6.32-279.el6.x86_64 > Reporter: pangxiaoxi > Priority: Critical > > I excute mapreduce scan all table, sometimes map input value of rowkey is= out of range on current Region (get from inputsplit ). > this mabey lost data or get unused data. > ps. I want to use ImportTSV translate table..... > eg.=20 > location=3Ddatanode11,start_row=3DD9CB114FD09A82A3_0000000000000000_m_43D= AAA689D4AFC86 ,rowkey=3DD323E1D0A51E5185_0000000000000000_m_75686B892410804= 4 ,end_row=3DDB0C4FC44E6D80C1_0000000000000000_m_E956CC65322BA3E5 -- This message was sent by Atlassian JIRA (v6.3.4#6332)