hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arthur van Hoff" <...@ellerdale.com>
Subject Improving locality of table access...
Date Wed, 22 Oct 2008 22:14:23 GMT
Hi,

Below is some code for improving the read performance of large tables by
processing each region on the host holding that region. We measured 50-60%
lower network bandwidth.

To use this class instead of org.apache.hadoop.hbase.mapred.TableInputFormat
class use:

    jobconf.setInputFormat(ellerdale.mapreduce.TableInputFormatFix);

Please send me feedback, if you can think off better ways to do this.

-- 
Arthur van Hoff - Grand Master of Alphabetical Order
The Ellerdale Project, Menlo Park, CA
avh@ellerdale.com, 650-283-0842


-- TableInputFormatFix.java --

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// Author: Arthur van Hoff, avh@ellerdale.com

package ellerdale.mapreduce;

import java.io.*;
import java.util.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.mapred.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.*;

//
// Attempt to fix the localized nature of table segments.
// Compute table splits so that they are processed locally.
// Combine multiple splits to avoid the number of splits exceeding
numSplits.
// Sort the resulting splits so that the shortest ones are processed last.
// The resulting savings in network bandwidth are significant (we measured
60%).
//
public class TableInputFormatFix extends TableInputFormat
{
    public static final int ORIGINAL    = 0;
    public static final int LOCALIZED    = 1;
    public static final int OPTIMIZED    = 2;    // not yet functional

    //
    // A table split with a location.
    //
    static class LocationTableSplit extends TableSplit implements Comparable
    {
    String location;

    public LocationTableSplit()
    {
    }
    public LocationTableSplit(byte [] tableName, byte [] startRow, byte []
endRow, String location)
    {
        super(tableName, startRow, endRow);
        this.location = location;
    }
    public String[] getLocations()
    {
        return new String[] {location};
    }
    public void readFields(DataInput in) throws IOException
    {
        super.readFields(in);
        this.location = Bytes.toString(Bytes.readByteArray(in));
    }
    public void write(DataOutput out) throws IOException
    {
        super.write(out);
        Bytes.writeByteArray(out, Bytes.toBytes(location));
    }
    public int compareTo(Object other)
    {
        LocationTableSplit otherSplit = (LocationTableSplit)other;
        int result = Bytes.compareTo(getStartRow(),
otherSplit.getStartRow());
        return result;
    }
    public String toString()
    {
        return location.substring(0, location.indexOf('.')) + ": " +
Bytes.toString(getStartRow()) + "-" + Bytes.toString(getEndRow());
    }
    }

    //
    // A table split with a location that covers multiple regions.
    //
    static class MultiRegionTableSplit extends LocationTableSplit
    {
    byte[][] regions;

    public MultiRegionTableSplit()
    {
    }
    public MultiRegionTableSplit(byte[] tableName, String location, byte[][]
regions) throws IOException
    {
        super(tableName, regions[0], regions[regions.length-1], location);
        this.location = location;
        this.regions = regions;
    }
    public void readFields(DataInput in) throws IOException
    {
        super.readFields(in);
        int n = in.readInt();
        regions = new byte[n][];
        for (int i = 0 ; i < n ; i++) {
        regions[i] = Bytes.readByteArray(in);
        }
    }
    public void write(DataOutput out) throws IOException
    {
        super.write(out);
        out.writeInt(regions.length);
        for (int i = 0 ; i < regions.length ; i++) {
        Bytes.writeByteArray(out, regions[i]);
        }
    }
    public String toString()
    {
        String str = location.substring(0, location.indexOf('.')) + ": ";
        for (int i = 0 ; i < regions.length ; i += 2) {
        if (i > 0) {
            str += ", ";
        }
        str += Bytes.toString(regions[i]) + "-" +
Bytes.toString(regions[i+1]);
        }
        return str;
    }
    public int compareTo(Object other)
    {
        MultiRegionTableSplit otherSplit = (MultiRegionTableSplit)other;
        int result = otherSplit.regions.length - regions.length;
        if (result == 0) {
        result = Bytes.compareTo(getStartRow(), otherSplit.getStartRow());
        }
        return result;
    }
    }

    //
    // TableRecordReader that can handle multiple regions.
    //
    protected class MultiRegionTableRecordReader implements
RecordReader<ImmutableBytesWritable, RowResult>
    {
    private HTable htable;
    private byte [][] trrInputColumns;

    private int currentregion;
    private byte[][] regions;
    private byte [] lastRow;
    private Scanner scanner;

    void setHTable(HTable htable)
    {
        this.htable = htable;
    }
    void setInputColumns(final byte [][] inputColumns)
    {
        this.trrInputColumns = inputColumns;
    }
    void setRegions(byte[][] regions)
    {
        this.regions = regions;
    }

    int getRegion(byte[] row)
    {
        for (int i = 0 ; i < regions.length ; i += 2) {
        byte[] startRow = regions[i + 0];
        byte[] endRow = regions[i + 1];
        if (startRow.length > 0 && Bytes.compareTo(startRow, row) > 0) {
            continue;
        }
        if (endRow.length > 0 && Bytes.compareTo(row, endRow) >= 0) {
            continue;
        }
        return i/2;
        }
        return -1;
    }

    //
    // The buisiness end the the reader
    //
    public void init() throws IOException
    {
        restart(regions[0]);
    }
    public void restart(byte[] row) throws IOException
    {
        currentregion = getRegion(row);
        byte[] startRow = regions[currentregion*2 + 0];
        byte[] endRow = regions[currentregion*2 + 1];
        if (endRow.length > 0) {
        scanner = htable.getScanner(trrInputColumns, startRow, endRow);
        } else {
        scanner = htable.getScanner(trrInputColumns, startRow);
        }
    }

    public ImmutableBytesWritable createKey()
    {
        return new ImmutableBytesWritable();
    }
    public RowResult createValue()
    {
        return new RowResult();
    }

    public long getPos()
    {
        return 0;
    }
    public float getProgress()
    {
        int nregions = regions.length/2;
        return ((100 * (2 * currentregion + 1)) / (2 * nregions)) / 100f;
    }

    public boolean next(ImmutableBytesWritable key, RowResult value) throws
IOException
    {
        while (true) {
        RowResult result;
        try {
            result = scanner.next();
        } catch (UnknownScannerException e) {
            restart(lastRow);
            scanner.next();    // skip presumed already mapped row
            result = scanner.next();
        }
        boolean hasMore = result != null && result.size() > 0;
        if (!hasMore && currentregion+1 < regions.length/2) {
            // move to the next region
            restart(regions[(currentregion+1)*2]);
            continue;
        }
        if (hasMore) {
            key.set(result.getRow());
            lastRow = key.get();
            Writables.copyWritable(result, value);
        }
        return hasMore;
        }
    }

    public void close()
    {
        scanner.close();
    }
    }

    //
    // Main class
    //

    int type;
    HTable table;
    byte[][] inputColumns;
    MultiRegionTableRecordReader reader;

    public TableInputFormatFix()
    {
    this(OPTIMIZED);
    }
    public TableInputFormatFix(int type)
    {
    this.type = type;
    this.reader = new MultiRegionTableRecordReader();
    }

    protected void setHTable(HTable table)
    {
    this.table = table;
    super.setHTable(table);
    }
    protected void setInputColums(byte [][] inputColumns)
    {
    this.inputColumns = inputColumns;
    super.setInputColums(inputColumns);
    }

    //
    // Create RecordReader
    //
    public RecordReader<ImmutableBytesWritable, RowResult>
getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
IOException
    {
    TableSplit tSplit = (TableSplit) split;
    MultiRegionTableRecordReader trr = reader;
    trr.setHTable(this.table);
    trr.setInputColumns(this.inputColumns);
    //trr.setRowFilter(this.rowFilter);
    if (tSplit instanceof MultiRegionTableSplit) {
        trr.setRegions(((MultiRegionTableSplit)tSplit).regions);
    } else {
        trr.setRegions(new byte[][] {tSplit.getStartRow(),
tSplit.getEndRow()});
    }
    trr.init();
    return trr;
    }

    //
    // Compute the splits.
    //
    public InputSplit[] getSplits(JobConf job, int numSplits) throws
IOException
    {
    InputSplit[] splits = null;
    byte [][] startKeys = this.table.getStartKeys();
    if (startKeys == null || startKeys.length == 0) {
        throw new IOException("Expecting at least one region");
    }
    if (this.table == null) {
        throw new IOException("No table was provided");
    }
    if (this.inputColumns == null || this.inputColumns.length == 0) {
        throw new IOException("Expecting at least one column");
    }

    switch (type) {
    case ORIGINAL:
        {
        // This is the original algorithm with no locations.
        int realNumSplits = numSplits > startKeys.length ? startKeys.length
: numSplits;
        splits = new InputSplit[realNumSplits];
        int middle = startKeys.length / realNumSplits;
        int startPos = 0;
        for (int i = 0; i < realNumSplits; i++) {
            int lastPos = startPos + middle;
            lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
lastPos;
            splits[i] = new TableSplit(this.table.getTableName(),
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
HConstants.EMPTY_START_ROW);

            startPos = lastPos;
        }
        break;
        }
    case LOCALIZED:
        {
        // This is the original algorithm with a minor fix for adding the
likely location of each region.
        int realNumSplits = numSplits > startKeys.length ? startKeys.length
: numSplits;
        splits = new InputSplit[realNumSplits];
        int middle = startKeys.length / realNumSplits;
        int startPos = 0;
        for (int i = 0; i < realNumSplits; i++) {
            int lastPos = startPos + middle;
            lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 :
lastPos;
            String regionLocation =
table.getRegionLocation(startKeys[startPos]).getServerAddress().getHostname();
            splits[i] = new LocationTableSplit(this.table.getTableName(),
startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos] :
HConstants.EMPTY_START_ROW, regionLocation);

            startPos = lastPos;
        }
        break;
        }
    case OPTIMIZED:
        {
        // This is an optimized algorithm that bonds multiple regions
together in each split.
        int nregions = 0;
        int nhosts = 0;
        HashMap<String, ArrayList<HRegionInfo>> hosts = new HashMap<String,
ArrayList<HRegionInfo>>();
        for (java.util.Map.Entry<HRegionInfo,HServerAddress> e :
table.getRegionsInfo().entrySet()) {
            String host = e.getValue().getHostname();
            ArrayList<HRegionInfo> regions = hosts.get(host);
            if (regions == null) {
            regions = new ArrayList<HRegionInfo>();
            hosts.put(host, regions);
            nhosts++;
            }
            regions.add(e.getKey());
            nregions++;
        }
        if (numSplits < nhosts) {
            numSplits = nhosts;
        }
        if (numSplits > nregions) {
            numSplits = nregions;
        }
        float sph = (float)numSplits/nhosts;
        float sphremainder = 0f;

        ArrayList<InputSplit> splitlist = new ArrayList<InputSplit>();
        for (String host : hosts.keySet()) {
            ArrayList<HRegionInfo> regions = hosts.get(host);
            float rps = ((regions.size() - 1) + sphremainder) / (sph - 1);
            sphremainder = sph;
            float rpsremainder = 0f;
            for (int i = 0 ; i < regions.size() ;) {
            rpsremainder += rps;
            int splitSize = Math.max(1, (int)rpsremainder);
            if (i + splitSize > regions.size()) {
                splitSize = regions.size() - i;
            }
            //System.out.println(host + ": " + numSplits + "/" + nregions +
"/" + splitSize + ":");
            byte[][] splitregions = new byte[splitSize*2][];
            for (int j = 0 ; j < splitSize ; j++) {
                HRegionInfo region = regions.get(i + j);
                splitregions[j*2 + 0] = region.getStartKey();
                splitregions[j*2 + 1] = region.getEndKey();
            }
            splitlist.add(new MultiRegionTableSplit(table.getTableName(),
host, splitregions));

            i += splitSize;
            rpsremainder -= splitSize;
            sphremainder -= 1;
            }
        }

        // copy into a real array (there must be a better way)
        int n = splitlist.size();
        //n = 1;
        splits = new InputSplit[n];
        for (int i = splits.length ; i-- > 0 ;) {
            splits[i] = splitlist.get(i);
        }
        Arrays.sort(splits);
        break;
        }
    }

    // dump the splits
    if (false) {
        System.out.println("---- " + splits.length + " splits ----");
        for (int i = 0 ; i < splits.length ; i++) {
        System.out.println(i + ": " + splits[i].toString());
        }
        System.exit(1);
    }

    // return the splits
    return splits;
    }
}

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message