Return-Path: Delivered-To: apmail-hadoop-core-user-archive@www.apache.org Received: (qmail 19808 invoked from network); 23 Oct 2008 06:09:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 23 Oct 2008 06:09:50 -0000 Received: (qmail 49911 invoked by uid 500); 23 Oct 2008 06:09:48 -0000 Delivered-To: apmail-hadoop-core-user-archive@hadoop.apache.org Received: (qmail 49862 invoked by uid 500); 23 Oct 2008 06:09:48 -0000 Mailing-List: contact core-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-user@hadoop.apache.org Delivered-To: mailing list core-user@hadoop.apache.org Received: (qmail 49839 invoked by uid 99); 23 Oct 2008 06:09:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Oct 2008 23:09:48 -0700 X-ASF-Spam-Status: No, hits=-1.0 required=10.0 tests=RCVD_IN_DNSWL_LOW,SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of gcjlhu-hadoop-user@m.gmane.org designates 80.91.229.2 as permitted sender) Received: from [80.91.229.2] (HELO ciao.gmane.org) (80.91.229.2) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Oct 2008 06:08:37 +0000 Received: from list by ciao.gmane.org with local (Exim 4.43) id 1KstKa-0001vM-NP for core-user@hadoop.apache.org; Thu, 23 Oct 2008 06:06:00 +0000 Received: from adsl-70-235-18-238.dsl.ltrkar.sbcglobal.net ([70.235.18.238]) by main.gmane.org with esmtp (Gmexim 0.1 (Debian)) id 1AlnuQ-0007hv-00 for ; Thu, 23 Oct 2008 06:06:00 +0000 Received: from sales by adsl-70-235-18-238.dsl.ltrkar.sbcglobal.net with local (Gmexim 0.1 (Debian)) id 1AlnuQ-0007hv-00 for ; Thu, 23 Oct 2008 06:06:00 +0000 X-Injected-Via-Gmane: http://gmane.org/ To: core-user@hadoop.apache.org From: "Billy Pearson" Subject: Re: Improving locality of table access... Date: Thu, 23 Oct 2008 01:05:47 -0500 Lines: 497 Message-ID: References: <6fe3d1ae0810221514w67eb9d3enb77e64059d921870@mail.gmail.com> Mime-Version: 1.0 Content-Type: text/plain; format=flowed; charset="iso-8859-1"; reply-type=original Content-Transfer-Encoding: 7bit X-Complaints-To: usenet@ger.gmane.org X-Gmane-NNTP-Posting-Host: adsl-70-235-18-238.dsl.ltrkar.sbcglobal.net In-Reply-To: <6fe3d1ae0810221514w67eb9d3enb77e64059d921870@mail.gmail.com> X-MSMail-Priority: Normal X-Newsreader: Microsoft Windows Mail 6.0.6001.18000 X-MimeOLE: Produced By Microsoft MimeOLE V6.0.6001.18049 Sender: news X-Virus-Checked: Checked by ClamAV on apache.org generate a patch and post it here https://issues.apache.org/jira/browse/HBASE-675 Billy "Arthur van Hoff" wrote in message news:6fe3d1ae0810221514w67eb9d3enb77e64059d921870@mail.gmail.com... > Hi, > > Below is some code for improving the read performance of large tables by > processing each region on the host holding that region. We measured 50-60% > lower network bandwidth. > > To use this class instead of > org.apache.hadoop.hbase.mapred.TableInputFormat > class use: > > jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix); > > Please send me feedback, if you can think off better ways to do this. > > -- > Arthur van Hoff - Grand Master of Alphabetical Order > The Ellerdale Project, Menlo Park, CA > avh@ellerdale.com, 650-283-0842 > > > -- TableInputFormatFix.java -- > > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > // Author: Arthur van Hoff, avh@ellerdale.com > > package ellerdale.mapreduce; > > import java.io.*; > import java.util.*; > > import org.apache.hadoop.io.*; > import org.apache.hadoop.fs.*; > import org.apache.hadoop.util.*; > import org.apache.hadoop.conf.*; > import org.apache.hadoop.mapred.*; > > import org.apache.hadoop.hbase.*; > import org.apache.hadoop.hbase.mapred.*; > import org.apache.hadoop.hbase.client.*; > import org.apache.hadoop.hbase.client.Scanner; > import org.apache.hadoop.hbase.io.*; > import org.apache.hadoop.hbase.util.*; > > // > // Attempt to fix the localized nature of table segments. > // Compute table splits so that they are processed locally. > // Combine multiple splits to avoid the number of splits exceeding > numSplits. > // Sort the resulting splits so that the shortest ones are processed last. > // The resulting savings in network bandwidth are significant (we measured > 60%). > // > public class TableInputFormatFix extends TableInputFormat > { > public static final int ORIGINAL = 0; > public static final int LOCALIZED = 1; > public static final int OPTIMIZED = 2; // not yet functional > > // > // A table split with a location. > // > static class LocationTableSplit extends TableSplit implements > Comparable > { > String location; > > public LocationTableSplit() > { > } > public LocationTableSplit(byte [] tableName, byte [] startRow, byte [] > endRow, String location) > { > super(tableName, startRow, endRow); > this.location = location; > } > public String[] getLocations() > { > return new String[] {location}; > } > public void readFields(DataInput in) throws IOException > { > super.readFields(in); > this.location = Bytes.toString(Bytes.readByteArray(in)); > } > public void write(DataOutput out) throws IOException > { > super.write(out); > Bytes.writeByteArray(out, Bytes.toBytes(location)); > } > public int compareTo(Object other) > { > LocationTableSplit otherSplit = (LocationTableSplit)other; > int result = Bytes.compareTo(getStartRow(), > otherSplit.getStartRow()); > return result; > } > public String toString() > { > return location.substring(0, location.indexOf('.')) + ": " + > Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow()); > } > } > > // > // A table split with a location that covers multiple regions. > // > static class MultiRegionTableSplit extends LocationTableSplit > { > byte[][] regions; > > public MultiRegionTableSplit() > { > } > public MultiRegionTableSplit(byte[] tableName, String location, > byte[][] > regions) throws IOException > { > super(tableName, regions[0], regions[regions.length-1], location); > this.location = location; > this.regions = regions; > } > public void readFields(DataInput in) throws IOException > { > super.readFields(in); > int n = in.readInt(); > regions = new byte[n][]; > for (int i = 0 ; i < n ; i++) { > regions[i] = Bytes.readByteArray(in); > } > } > public void write(DataOutput out) throws IOException > { > super.write(out); > out.writeInt(regions.length); > for (int i = 0 ; i < regions.length ; i++) { > Bytes.writeByteArray(out, regions[i]); > } > } > public String toString() > { > String str = location.substring(0, location.indexOf('.')) + ": "; > for (int i = 0 ; i < regions.length ; i += 2) { > if (i > 0) { > str += ", "; > } > str += Bytes.toString(regions[i]) + "-" + > Bytes.toString(regions[i+1]); > } > return str; > } > public int compareTo(Object other) > { > MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other; > int result = otherSplit.regions.length - regions.length; > if (result == 0) { > result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow()); > } > return result; > } > } > > // > // TableRecordReader that can handle multiple regions. > // > protected class MultiRegionTableRecordReader implements > RecordReader > { > private HTable htable; > private byte [][] trrInputColumns; > > private int currentregion; > private byte[][] regions; > private byte [] lastRow; > private Scanner scanner; > > void setHTable(HTable htable) > { > this.htable = htable; > } > void setInputColumns(final byte [][] inputColumns) > { > this.trrInputColumns = inputColumns; > } > void setRegions(byte[][] regions) > { > this.regions = regions; > } > > int getRegion(byte[] row) > { > for (int i = 0 ; i < regions.length ; i += 2) { > byte[] startRow = regions[i + 0]; > byte[] endRow = regions[i + 1]; > if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0) { > continue; > } > if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) { > continue; > } > return i/2; > } > return -1; > } > > // > // The buisiness end the the reader > // > public void init() throws IOException > { > restart(regions[0]); > } > public void restart(byte[] row) throws IOException > { > currentregion = getRegion(row); > byte[] startRow = regions[currentregion*2 + 0]; > byte[] endRow = regions[currentregion*2 + 1]; > if (endRow.length > 0) { > scanner = htable.getScanner(trrInputColumns, startRow, endRow); > } else { > scanner = htable.getScanner(trrInputColumns, startRow); > } > } > > public ImmutableBytesWritable createKey() > { > return new ImmutableBytesWritable(); > } > public RowResult createValue() > { > return new RowResult(); > } > > public long getPos() > { > return 0; > } > public float getProgress() > { > int nregions = regions.length/2; > return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f; > } > > public boolean next(ImmutableBytesWritable key, RowResult value) throws > IOException > { > while (true) { > RowResult result; > try { > result = scanner.next(); > } catch (UnknownScannerException e) { > restart(lastRow); > scanner.next(); // skip presumed already mapped row > result = scanner.next(); > } > boolean hasMore = result != null && result.size() > 0; > if (!hasMore && currentregion+1 < regions.length/2) { > // move to the next region > restart(regions[(currentregion+1)*2]); > continue; > } > if (hasMore) { > key.set(result.getRow()); > lastRow = key.get(); > Writables.copyWritable(result, value); > } > return hasMore; > } > } > > public void close() > { > scanner.close(); > } > } > > // > // Main class > // > > int type; > HTable table; > byte[][] inputColumns; > MultiRegionTableRecordReader reader; > > public TableInputFormatFix() > { > this(OPTIMIZED); > } > public TableInputFormatFix(int type) > { > this.type = type; > this.reader = new MultiRegionTableRecordReader(); > } > > protected void setHTable(HTable table) > { > this.table = table; > super.setHTable(table); > } > protected void setInputColums(byte [][] inputColumns) > { > this.inputColumns = inputColumns; > super.setInputColums(inputColumns); > } > > // > // Create RecordReader > // > public RecordReader > getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws > IOException > { > TableSplit tSplit = (TableSplit) split; > MultiRegionTableRecordReader trr = reader; > trr.setHTable(this.table); > trr.setInputColumns(this.inputColumns); > //trr.setRowFilter(this.rowFilter); > if (tSplit instanceof MultiRegionTableSplit) { > trr.setRegions(((MultiRegionTableSplit)tSplit).regions); > } else { > trr.setRegions(new byte[][] {tSplit.getStartRow(), > tSplit.getEndRow()}); > } > trr.init(); > return trr; > } > > // > // Compute the splits. > // > public InputSplit[] getSplits(JobConf job, int numSplits) throws > IOException > { > InputSplit[] splits = null; > byte [][] startKeys = this.table.getStartKeys(); > if (startKeys == null || startKeys.length == 0) { > throw new IOException("Expecting at least one region"); > } > if (this.table == null) { > throw new IOException("No table was provided"); > } > if (this.inputColumns == null || this.inputColumns.length == 0) { > throw new IOException("Expecting at least one column"); > } > > switch (type) { > case ORIGINAL: > { > // This is the original algorithm with no locations. > int realNumSplits = numSplits > startKeys.length ? startKeys.length > : numSplits; > splits = new InputSplit[realNumSplits]; > int middle = startKeys.length / realNumSplits; > int startPos = 0; > for (int i = 0; i < realNumSplits; i++) { > int lastPos = startPos + middle; > lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : > lastPos; > splits[i] = new TableSplit(this.table.getTableName(), > startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] : > HConstants.EMPTY_START_ROW); > > startPos = lastPos; > } > break; > } > case LOCALIZED: > { > // This is the original algorithm with a minor fix for adding the > likely location of each region. > int realNumSplits = numSplits > startKeys.length ? startKeys.length > : numSplits; > splits = new InputSplit[realNumSplits]; > int middle = startKeys.length / realNumSplits; > int startPos = 0; > for (int i = 0; i < realNumSplits; i++) { > int lastPos = startPos + middle; > lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : > lastPos; > String regionLocation = > table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostname(); > splits[i] = new LocationTableSplit(this.table.getTableName(), > startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] : > HConstants.EMPTY_START_ROW, regionLocation); > > startPos = lastPos; > } > break; > } > case OPTIMIZED: > { > // This is an optimized algorithm that bonds multiple regions > together in each split. > int nregions = 0; > int nhosts = 0; > HashMap> hosts = new HashMap ArrayList>(); > for (java.util.Map.Entry e : > table.getRegionsInfo().entrySet()) { > String host = e.getValue().getHostname(); > ArrayList regions = hosts.get(host); > if (regions == null) { > regions = new ArrayList(); > hosts.put(host, regions); > nhosts++; > } > regions.add(e.getKey()); > nregions++; > } > if (numSplits < nhosts) { > numSplits = nhosts; > } > if (numSplits > nregions) { > numSplits = nregions; > } > float sph = (float)numSplits/nhosts; > float sphremainder = 0f; > > ArrayList splitlist = new ArrayList(); > for (String host : hosts.keySet()) { > ArrayList regions = hosts.get(host); > float rps = ((regions.size() - 1) + sphremainder) / (sph - 1); > sphremainder = sph; > float rpsremainder = 0f; > for (int i = 0 ; i < regions.size() ;) { > rpsremainder += rps; > int splitSize = Math.max(1, (int)rpsremainder); > if (i + splitSize > regions.size()) { > splitSize = regions.size() - i; > } > //System.out.println(host + ": " + numSplits + "/" + nregions + > "/" + splitSize + ":"); > byte[][] splitregions = new byte[splitSize*2][]; > for (int j = 0 ; j < splitSize ; j++) { > HRegionInfo region = regions.get(i + j); > splitregions[j*2 + 0] = region.getStartKey(); > splitregions[j*2 + 1] = region.getEndKey(); > } > splitlist.add(new MultiRegionTableSplit(table.getTableName(), > host, splitregions)); > > i += splitSize; > rpsremainder -= splitSize; > sphremainder -= 1; > } > } > > // copy into a real array (there must be a better way) > int n = splitlist.size(); > //n = 1; > splits = new InputSplit[n]; > for (int i = splits.length ; i-- > 0 ;) { > splits[i] = splitlist.get(i); > } > Arrays.sort(splits); > break; > } > } > > // dump the splits > if (false) { > System.out.println("---- " + splits.length + " splits ----"); > for (int i = 0 ; i < splits.length ; i++) { > System.out.println(i + ": " + splits[i].toString()); > } > System.exit(1); > } > > // return the splits > return splits; > } > } >