hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Billy Pearson" <sa...@pearsonwholesale.com>
Subject Re: Improving locality of table access...
Date Thu, 23 Oct 2008 06:05:47 GMT
generate a patch and post it here
https://issues.apache.org/jira/browse/HBASE-675

Billy

"Arthur van Hoff" <avh@ellerdale.com> wrote in 
message news:6fe3d1ae0810221514w67eb9d3enb77e64059d921870@mail.gmail.com...
> Hi,
>
> Below is some code for improving the read performance of large tables by
> processing each region on the host holding that region. We measured 50-60%
> lower network bandwidth.
>
> To use this class instead of 
> org.apache.hadoop.hbase.mapred.TableInputFormat
> class use:
>
>    jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix);
>
> Please send me feedback, if you can think off better ways to do this.
>
> -- 
> Arthur van Hoff - Grand Master of Alphabetical Order
> The Ellerdale Project, Menlo Park, CA
> avh@ellerdale.com, 650-283-0842
>
>
> -- TableInputFormatFix.java --
>
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> *     http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> // Author: Arthur van Hoff, avh@ellerdale.com
>
> package ellerdale.mapreduce;
>
> import java.io.*;
> import java.util.*;
>
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.util.*;
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.mapred.*;
>
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.client.*;
> import org.apache.hadoop.hbase.client.Scanner;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.util.*;
>
> //
> // Attempt to fix the localized nature of table segments.
> // Compute table splits so that they are processed locally.
> // Combine multiple splits to avoid the number of splits exceeding
> numSplits.
> // Sort the resulting splits so that the shortest ones are processed last.
> // The resulting savings in network bandwidth are significant (we measured
> 60%).
> //
> public class TableInputFormatFix extends TableInputFormat
> {
>    public static final int ORIGINAL    = 0;
>    public static final int LOCALIZED    = 1;
>    public static final int OPTIMIZED    = 2;    // not yet functional
>
>    //
>    // A table split with a location.
>    //
>    static class LocationTableSplit extends TableSplit implements 
> Comparable
>    {
>    String location;
>
>    public LocationTableSplit()
>    {
>    }
>    public LocationTableSplit(byte [] tableName, byte [] startRow, byte []
> endRow, String location)
>    {
>        super(tableName, startRow, endRow);
>        this.location = location;
>    }
>    public String[] getLocations()
>    {
>        return new String[] {location};
>    }
>    public void readFields(DataInput in) throws IOException
>    {
>        super.readFields(in);
>        this.location = Bytes.toString(Bytes.readByteArray(in));
>    }
>    public void write(DataOutput out) throws IOException
>    {
>        super.write(out);
>        Bytes.writeByteArray(out, Bytes.toBytes(location));
>    }
>    public int compareTo(Object other)
>    {
>        LocationTableSplit otherSplit = (LocationTableSplit)other;
>        int result = Bytes.compareTo(getStartRow(),
> otherSplit.getStartRow());
>        return result;
>    }
>    public String toString()
>    {
>        return location.substring(0, location.indexOf('.')) + ": " +
> Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow());
>    }
>    }
>
>    //
>    // A table split with a location that covers multiple regions.
>    //
>    static class MultiRegionTableSplit extends LocationTableSplit
>    {
>    byte[][] regions;
>
>    public MultiRegionTableSplit()
>    {
>    }
>    public MultiRegionTableSplit(byte[] tableName, String location, 
> byte[][]
> regions) throws IOException
>    {
>        super(tableName, regions[0], regions[regions.length-1], location);
>        this.location = location;
>        this.regions = regions;
>    }
>    public void readFields(DataInput in) throws IOException
>    {
>        super.readFields(in);
>        int n = in.readInt();
>        regions = new byte[n][];
>        for (int i = 0 ; i < n ; i++) {
>        regions[i] = Bytes.readByteArray(in);
>        }
>    }
>    public void write(DataOutput out) throws IOException
>    {
>        super.write(out);
>        out.writeInt(regions.length);
>        for (int i = 0 ; i < regions.length ; i++) {
>        Bytes.writeByteArray(out, regions[i]);
>        }
>    }
>    public String toString()
>    {
>        String str = location.substring(0, location.indexOf('.')) + ": ";
>        for (int i = 0 ; i < regions.length ; i += 2) {
>        if (i > 0) {
>            str += ", ";
>        }
>        str += Bytes.toString(regions[i]) + "-" +
> Bytes.toString(regions[i+1]);
>        }
>        return str;
>    }
>    public int compareTo(Object other)
>    {
>        MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other;
>        int result = otherSplit.regions.length - regions.length;
>        if (result == 0) {
>        result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow());
>        }
>        return result;
>    }
>    }
>
>    //
>    // TableRecordReader that can handle multiple regions.
>    //
>    protected class MultiRegionTableRecordReader implements
> RecordReader<ImmutableBytesWritable, RowResult>
>    {
>    private HTable htable;
>    private byte [][] trrInputColumns;
>
>    private int currentregion;
>    private byte[][] regions;
>    private byte [] lastRow;
>    private Scanner scanner;
>
>    void setHTable(HTable htable)
>    {
>        this.htable = htable;
>    }
>    void setInputColumns(final byte [][] inputColumns)
>    {
>        this.trrInputColumns = inputColumns;
>    }
>    void setRegions(byte[][] regions)
>    {
>        this.regions = regions;
>    }
>
>    int getRegion(byte[] row)
>    {
>        for (int i = 0 ; i < regions.length ; i += 2) {
>        byte[] startRow = regions[i + 0];
>        byte[] endRow = regions[i + 1];
>        if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0) {
>            continue;
>        }
>        if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) {
>            continue;
>        }
>        return i/2;
>        }
>        return -1;
>    }
>
>    //
>    // The buisiness end the the reader
>    //
>    public void init() throws IOException
>    {
>        restart(regions[0]);
>    }
>    public void restart(byte[] row) throws IOException
>    {
>        currentregion = getRegion(row);
>        byte[] startRow = regions[currentregion*2 + 0];
>        byte[] endRow = regions[currentregion*2 + 1];
>        if (endRow.length > 0) {
>        scanner = htable.getScanner(trrInputColumns, startRow, endRow);
>        } else {
>        scanner = htable.getScanner(trrInputColumns, startRow);
>        }
>    }
>
>    public ImmutableBytesWritable createKey()
>    {
>        return new ImmutableBytesWritable();
>    }
>    public RowResult createValue()
>    {
>        return new RowResult();
>    }
>
>    public long getPos()
>    {
>        return 0;
>    }
>    public float getProgress()
>    {
>        int nregions = regions.length/2;
>        return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f;
>    }
>
>    public boolean next(ImmutableBytesWritable key, RowResult value) throws
> IOException
>    {
>        while (true) {
>        RowResult result;
>        try {
>            result = scanner.next();
>        } catch (UnknownScannerException e) {
>            restart(lastRow);
>            scanner.next();    // skip presumed already mapped row
>            result = scanner.next();
>        }
>        boolean hasMore = result != null && result.size() > 0;
>        if (!hasMore && currentregion+1 < regions.length/2) {
>            // move to the next region
>            restart(regions[(currentregion+1)*2]);
>            continue;
>        }
>        if (hasMore) {
>            key.set(result.getRow());
>            lastRow = key.get();
>            Writables.copyWritable(result, value);
>        }
>        return hasMore;
>        }
>    }
>
>    public void close()
>    {
>        scanner.close();
>    }
>    }
>
>    //
>    // Main class
>    //
>
>    int type;
>    HTable table;
>    byte[][] inputColumns;
>    MultiRegionTableRecordReader reader;
>
>    public TableInputFormatFix()
>    {
>    this(OPTIMIZED);
>    }
>    public TableInputFormatFix(int type)
>    {
>    this.type = type;
>    this.reader = new MultiRegionTableRecordReader();
>    }
>
>    protected void setHTable(HTable table)
>    {
>    this.table = table;
>    super.setHTable(table);
>    }
>    protected void setInputColums(byte [][] inputColumns)
>    {
>    this.inputColumns = inputColumns;
>    super.setInputColums(inputColumns);
>    }
>
>    //
>    // Create RecordReader
>    //
>    public RecordReader<ImmutableBytesWritable, RowResult>
> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
> IOException
>    {
>    TableSplit tSplit = (TableSplit) split;
>    MultiRegionTableRecordReader trr = reader;
>    trr.setHTable(this.table);
>    trr.setInputColumns(this.inputColumns);
>    //trr.setRowFilter(this.rowFilter);
>    if (tSplit instanceof MultiRegionTableSplit) {
>        trr.setRegions(((MultiRegionTableSplit)tSplit).regions);
>    } else {
>        trr.setRegions(new byte[][] {tSplit.getStartRow(),
> tSplit.getEndRow()});
>    }
>    trr.init();
>    return trr;
>    }
>
>    //
>    // Compute the splits.
>    //
>    public InputSplit[] getSplits(JobConf job, int numSplits) throws
> IOException
>    {
>    InputSplit[] splits = null;
>    byte [][] startKeys = this.table.getStartKeys();
>    if (startKeys == null || startKeys.length == 0) {
>        throw new IOException("Expecting at least one region");
>    }
>    if (this.table == null) {
>        throw new IOException("No table was provided");
>    }
>    if (this.inputColumns == null || this.inputColumns.length == 0) {
>        throw new IOException("Expecting at least one column");
>    }
>
>    switch (type) {
>    case ORIGINAL:
>        {
>        // This is the original algorithm with no locations.
>        int realNumSplits = numSplits > startKeys.length ? startKeys.length
> : numSplits;
>        splits = new InputSplit[realNumSplits];
>        int middle = startKeys.length / realNumSplits;
>        int startPos = 0;
>        for (int i = 0; i < realNumSplits; i++) {
>            int lastPos = startPos + middle;
>            lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
>            splits[i] = new TableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW);
>
>            startPos = lastPos;
>        }
>        break;
>        }
>    case LOCALIZED:
>        {
>        // This is the original algorithm with a minor fix for adding the
> likely location of each region.
>        int realNumSplits = numSplits > startKeys.length ? startKeys.length
> : numSplits;
>        splits = new InputSplit[realNumSplits];
>        int middle = startKeys.length / realNumSplits;
>        int startPos = 0;
>        for (int i = 0; i < realNumSplits; i++) {
>            int lastPos = startPos + middle;
>            lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
> lastPos;
>            String regionLocation =
> table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostname();
>            splits[i] = new LocationTableSplit(this.table.getTableName(),
> startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
> HConstants.EMPTY_START_ROW, regionLocation);
>
>            startPos = lastPos;
>        }
>        break;
>        }
>    case OPTIMIZED:
>        {
>        // This is an optimized algorithm that bonds multiple regions
> together in each split.
>        int nregions = 0;
>        int nhosts = 0;
>        HashMap<String, ArrayList<HRegionInfo>> hosts = new HashMap<String,
> ArrayList<HRegionInfo>>();
>        for (java.util.Map.Entry<HRegionInfo,HServerAddress> e :
> table.getRegionsInfo().entrySet()) {
>            String host = e.getValue().getHostname();
>            ArrayList<HRegionInfo> regions = hosts.get(host);
>            if (regions == null) {
>            regions = new ArrayList<HRegionInfo>();
>            hosts.put(host, regions);
>            nhosts++;
>            }
>            regions.add(e.getKey());
>            nregions++;
>        }
>        if (numSplits < nhosts) {
>            numSplits = nhosts;
>        }
>        if (numSplits > nregions) {
>            numSplits = nregions;
>        }
>        float sph = (float)numSplits/nhosts;
>        float sphremainder = 0f;
>
>        ArrayList<InputSplit> splitlist = new ArrayList<InputSplit>();
>        for (String host : hosts.keySet()) {
>            ArrayList<HRegionInfo> regions = hosts.get(host);
>            float rps = ((regions.size() - 1) + sphremainder) / (sph - 1);
>            sphremainder = sph;
>            float rpsremainder = 0f;
>            for (int i = 0 ; i < regions.size() ;) {
>            rpsremainder += rps;
>            int splitSize = Math.max(1, (int)rpsremainder);
>            if (i + splitSize > regions.size()) {
>                splitSize = regions.size() - i;
>            }
>            //System.out.println(host + ": " + numSplits + "/" + nregions +
> "/" + splitSize + ":");
>            byte[][] splitregions = new byte[splitSize*2][];
>            for (int j = 0 ; j < splitSize ; j++) {
>                HRegionInfo region = regions.get(i + j);
>                splitregions[j*2 + 0] = region.getStartKey();
>                splitregions[j*2 + 1] = region.getEndKey();
>            }
>            splitlist.add(new MultiRegionTableSplit(table.getTableName(),
> host, splitregions));
>
>            i += splitSize;
>            rpsremainder -= splitSize;
>            sphremainder -= 1;
>            }
>        }
>
>        // copy into a real array (there must be a better way)
>        int n = splitlist.size();
>        //n = 1;
>        splits = new InputSplit[n];
>        for (int i = splits.length ; i-- > 0 ;) {
>            splits[i] = splitlist.get(i);
>        }
>        Arrays.sort(splits);
>        break;
>        }
>    }
>
>    // dump the splits
>    if (false) {
>        System.out.println("---- " + splits.length + " splits ----");
>        for (int i = 0 ; i < splits.length ; i++) {
>        System.out.println(i + ": " + splits[i].toString());
>        }
>        System.exit(1);
>    }
>
>    // return the splits
>    return splits;
>    }
> }
> 



Mime
View raw message