hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jim Kellerman (POWERSET)" <Jim.Keller...@microsoft.com>
Subject RE: Improving locality of table access...
Date Wed, 22 Oct 2008 22:35:45 GMT
In the future, you should send HBase questions to the HBase user
mailing list: hbase-user@hadoop.apache.org if you want to get
a more timely response. HBase development is disconnected from
Hadoop development for the most part.

---
Jim Kellerman, Powerset (Live Search, Microsoft Corporation)


> -----Original Message-----
> From: Arthur van Hoff [mailto:avh@ellerdale.com]
> Sent: Wednesday, October 22, 2008 3:14 PM
> To: core-user@hadoop.apache.org
> Subject: Improving locality of table access...
>
> Hi,
>
> Below is some code for improving the read performance of large tables by
> processing each region on the host holding that region. We measured 50-60%
> lower network bandwidth.
>
> To use this class instead of
> org.apache.hadoop.hbase.mapred.TableInputFormat
> class use:
>
>     jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix);
>
> Please send me feedback, if you can think off better ways to do this.
>
> --
> Arthur van Hoff - Grand Master of Alphabetical Order
> The Ellerdale Project, Menlo Park, CA
> avh@ellerdale.com, 650-283-0842
>
>
> -- TableInputFormatFix.java --
>
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> // Author: Arthur van Hoff, avh@ellerdale.com
>
> package ellerdale.mapreduce;
>
> import java.io.*;
> import java.util.*;
>
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.util.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.mapred.*;
>
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.client.*;
> import org.apache.hadoop.hbase.client.Scanner;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.util.*;
>
> //
> // Attempt to fix the localized nature of table segments.
> // Compute table splits so that they are processed locally.
> // Combine multiple splits to avoid the number of splits exceeding
> numSplits.
> // Sort the resulting splits so that the shortest ones are processed last.
> // The resulting savings in network bandwidth are significant (we measured
> 60%).
> //
> public class TableInputFormatFix extends TableInputFormat
> {
>     public static final int ORIGINAL    = 0;
>     public static final int LOCALIZED    = 1;
>     public static final int OPTIMIZED    = 2;    // not yet functional
>
>     //
>     // A table split with a location.
>     //
>     static class LocationTableSplit extends TableSplit implements
> Comparable
>     {
>     String location;
>
>     public LocationTableSplit()
>     {
>     }
>     public LocationTableSplit(byte [] tableName, byte [] startRow, byte []
> endRow, String location)
>     {
>         super(tableName, startRow, endRow);
>         this.location = location;
>     }
>     public String[] getLocations()
>     {
>         return new String[] {location};
>     }
>     public void readFields(DataInput in) throws IOException
>     {
>         super.readFields(in);
>         this.location = Bytes.toString(Bytes.readByteArray(in));
>     }
>     public void write(DataOutput out) throws IOException
>     {
>         super.write(out);
>         Bytes.writeByteArray(out, Bytes.toBytes(location));
>     }
>     public int compareTo(Object other)
>     {
>         LocationTableSplit otherSplit = (LocationTableSplit)other;
>         int result = Bytes.compareTo(getStartRow(),
> otherSplit.getStartRow());
>         return result;
>     }
>     public String toString()
>     {
>         return location.substring(0, location.indexOf('.')) + ": " +
> Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow());
>     }
>     }
>
>     //
>     // A table split with a location that covers multiple regions.
>     //
>     static class MultiRegionTableSplit extends LocationTableSplit
>     {
>     byte[][] regions;
>
>     public MultiRegionTableSplit()
>     {
>     }
>     public MultiRegionTableSplit(byte[] tableName, String location,
> byte[][]
> regions) throws IOException
>     {
>         super(tableName, regions[0], regions[regions.length-1], location);
>         this.location = location;
>         this.regions = regions;
>     }
>     public void readFields(DataInput in) throws IOException
>     {
>         super.readFields(in);
>         int n = in.readInt();
>         regions = new byte[n][];
>         for (int i = 0 ; i < n ; i++) {
>         regions[i] = Bytes.readByteArray(in);
>         }
>     }
>     public void write(DataOutput out) throws IOException
>     {
>         super.write(out);
>         out.writeInt(regions.length);
>         for (int i = 0 ; i < regions.length ; i++) {
>         Bytes.writeByteArray(out, regions[i]);
>         }
>     }
>     public String toString()
>     {
>         String str = location.substring(0, location.indexOf('.')) + ": ";
>         for (int i = 0 ; i < regions.length ; i += 2) {
>         if (i > 0) {
>             str += ", ";
>         }
>         str += Bytes.toString(regions[i]) + "-" +
> Bytes.toString(regions[i+1]);
>         }
>         return str;
>     }
>     public int compareTo(Object other)
>     {
>         MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other;
>         int result = otherSplit.regions.length - regions.length;
>         if (result == 0) {
>         result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow());
>         }
>         return result;
>     }
>     }
>
>     //
>     // TableRecordReader that can handle multiple regions.
>     //
>     protected class MultiRegionTableRecordReader implements
> RecordReader<ImmutableBytesWritable, RowResult>
>     {
>     private HTable htable;
>     private byte [][] trrInputColumns;
>
>     private int currentregion;
>     private byte[][] regions;
>     private byte [] lastRow;
>     private Scanner scanner;
>
>     void setHTable(HTable htable)
>     {
>         this.htable = htable;
>     }
>     void setInputColumns(final byte [][] inputColumns)
>     {
>         this.trrInputColumns = inputColumns;
>     }
>     void setRegions(byte[][] regions)
>     {
>         this.regions = regions;
>     }
>
>     int getRegion(byte[] row)
>     {
>         for (int i = 0 ; i < regions.length ; i += 2) {
>         byte[] startRow = regions[i + 0];
>         byte[] endRow = regions[i + 1];
>         if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0)
{
>             continue;
>         }
>         if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) {
>             continue;
>         }
>         return i/2;
>         }
>         return -1;
>     }
>
>     //
>     // The buisiness end the the reader
>     //
>     public void init() throws IOException
>     {
>         restart(regions[0]);
>     }
>     public void restart(byte[] row) throws IOException
>     {
>         currentregion = getRegion(row);
>         byte[] startRow = regions[currentregion*2 + 0];
>         byte[] endRow = regions[currentregion*2 + 1];
>         if (endRow.length > 0) {
>         scanner = htable.getScanner(trrInputColumns, startRow, endRow);
>         } else {
>         scanner = htable.getScanner(trrInputColumns, startRow);
>         }
>     }
>
>     public ImmutableBytesWritable createKey()
>     {
>         return new ImmutableBytesWritable();
>     }
>     public RowResult createValue()
>     {
>         return new RowResult();
>     }
>
>     public long getPos()
>     {
>         return 0;
>     }
>     public float getProgress()
>     {
>         int nregions = regions.length/2;
>         return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f;
>     }
>
>     public boolean next(ImmutableBytesWritable key, RowResult value)
> throws
> IOException
>     {
>         while (true) {
>         RowResult result;
>         try {
>             result = scanner.next();
>         } catch (UnknownScannerException e) {
>             restart(lastRow);
>             scanner.next();    // skip presumed already mapped row
>             result = scanner.next();
>         }
>         boolean hasMore = result != null && result.size() > 0;
>         if (!hasMore && currentregion+1 < regions.length/2) {
>             // move to the next region
>             restart(regions[(currentregion+1)*2]);
>             continue;
>         }
>         if (hasMore) {
>             key.set(result.getRow());
>             lastRow = key.get();
>             Writables.copyWritable(result, value);
>         }
>         return hasMore;
>         }
>     }
>
>     public void close()
>     {
>         scanner.close();
>     }
>     }
>
>     //
>     // Main class
>     //
>
>     int type;
>     HTable table;
>     byte[][] inputColumns;
>     MultiRegionTableRecordReader reader;
>
>     public TableInputFormatFix()
>     {
>     this(OPTIMIZED);
>     }
>     public TableInputFormatFix(int type)
>     {
>     this.type = type;
>     this.reader = new MultiRegionTableRecordReader();
>     }
>
>     protected void setHTable(HTable table)
>     {
>     this.table = table;
>     super.setHTable(table);
>     }
>     protected void setInputColums(byte [][] inputColumns)
>     {
>     this.inputColumns = inputColumns;
>     super.setInputColums(inputColumns);
>     }
>
>     //
>     // Create RecordReader
>     //
>     public RecordReader<ImmutableBytesWritable, RowResult>
> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
> IOException
>     {
>     TableSplit tSplit = (TableSplit) split;
>     MultiRegionTableRecordReader trr = reader;
>     trr.setHTable(this.table);
>     trr.setInputColumns(this.inputColumns);
>     //trr.setRowFilter(this.rowFilter);
>     if (tSplit instanceof MultiRegionTableSplit) {
>         trr.setRegions(((MultiRegionTableSplit)tSplit).regions);
>     } else {
>         trr.setRegions(new byte[][] {tSplit.getStartRow(),
> tSplit.getEndRow()});
>     }
>     trr.init();
>     return trr;
>     }
>
>     //
>     // Compute the splits.
>     //
>     public InputSplit[] getSplits(JobConf job, int numSplits) throws
> IOException
>     {
>     InputSplit[] splits = null;
>     byte [][] startKeys = this.table.getStartKeys();
>     if (startKeys == null || startKeys.length == 0) {
>         throw new IOException("Expecting at least one region");
>     }
>     if (this.table == null) {
>         throw new IOException("No table was provided");
>     }
>     if (this.inputColumns == null || this.inputColumns.length == 0) {
>         throw new IOException("Expecting at least one column");
>     }
>
>     switch (type) {
>     case ORIGINAL:
>         {
>         // This is the original algorithm with no locations.
>         int realNumSplits = numSplits > startKeys.length ?
> startKeys.length
> : numSplits;
>         splits = new InputSplit[realNumSplits];
>         int middle = startKeys.length / realNumSplits;
>         int startPos = 0;
>         for (int i = 0; i < realNumSplits; i++) {
>             int lastPos = startPos + middle;
>             lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
>             splits[i] = new TableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW);
>
>             startPos = lastPos;
>         }
>         break;
>         }
>     case LOCALIZED:
>         {
>         // This is the original algorithm with a minor fix for adding the
> likely location of each region.
>         int realNumSplits = numSplits > startKeys.length ?
> startKeys.length
> : numSplits;
>         splits = new InputSplit[realNumSplits];
>         int middle = startKeys.length / realNumSplits;
>         int startPos = 0;
>         for (int i = 0; i < realNumSplits; i++) {
>             int lastPos = startPos + middle;
>             lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
>             String regionLocation =
> table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostnam
> e();
>             splits[i] = new LocationTableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW, regionLocation);
>
>             startPos = lastPos;
>         }
>         break;
>         }
>     case OPTIMIZED:
>         {
>         // This is an optimized algorithm that bonds multiple regions
> together in each split.
>         int nregions = 0;
>         int nhosts = 0;
>         HashMap<String, ArrayList<HRegionInfo>> hosts = new
> HashMap<String,
> ArrayList<HRegionInfo>>();
>         for (java.util.Map.Entry<HRegionInfo,HServerAddress> e :
> table.getRegionsInfo().entrySet()) {
>             String host = e.getValue().getHostname();
>             ArrayList<HRegionInfo> regions = hosts.get(host);
>             if (regions == null) {
>             regions = new ArrayList<HRegionInfo>();
>             hosts.put(host, regions);
>             nhosts++;
>             }
>             regions.add(e.getKey());
>             nregions++;
>         }
>         if (numSplits < nhosts) {
>             numSplits = nhosts;
>         }
>         if (numSplits > nregions) {
>             numSplits = nregions;
>         }
>         float sph = (float)numSplits/nhosts;
>         float sphremainder = 0f;
>
>         ArrayList<InputSplit> splitlist = new ArrayList<InputSplit>();
>         for (String host : hosts.keySet()) {
>             ArrayList<HRegionInfo> regions = hosts.get(host);
>             float rps = ((regions.size() - 1) + sphremainder) / (sph - 1);
>             sphremainder = sph;
>             float rpsremainder = 0f;
>             for (int i = 0 ; i < regions.size() ;) {
>             rpsremainder += rps;
>             int splitSize = Math.max(1, (int)rpsremainder);
>             if (i + splitSize > regions.size()) {
>                 splitSize = regions.size() - i;
>             }
>             //System.out.println(host + ": " + numSplits + "/" + nregions
> +
> "/" + splitSize + ":");
>             byte[][] splitregions = new byte[splitSize*2][];
>             for (int j = 0 ; j < splitSize ; j++) {
>                 HRegionInfo region = regions.get(i + j);
>                 splitregions[j*2 + 0] = region.getStartKey();
>                 splitregions[j*2 + 1] = region.getEndKey();
>             }
>             splitlist.add(new MultiRegionTableSplit(table.getTableName(),
> host, splitregions));
>
>             i += splitSize;
>             rpsremainder -= splitSize;
>             sphremainder -= 1;
>             }
>         }
>
>         // copy into a real array (there must be a better way)
>         int n = splitlist.size();
>         //n = 1;
>         splits = new InputSplit[n];
>         for (int i = splits.length ; i-- > 0 ;) {
>             splits[i] = splitlist.get(i);
>         }
>         Arrays.sort(splits);
>         break;
>         }
>     }
>
>     // dump the splits
>     if (false) {
>         System.out.println("---- " + splits.length + " splits ----");
>         for (int i = 0 ; i < splits.length ; i++) {
>         System.out.println(i + ": " + splits[i].toString());
>         }
>         System.exit(1);
>     }
>
>     // return the splits
>     return splits;
>     }
> }

Mime
View raw message