hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Naama Kraus" <naamakr...@gmail.com>
Subject Re: Map Reduce over HBase - sample code
Date Thu, 03 Jul 2008 07:08:41 GMT
Hi,

Here is a version following the last comments.

Naama

/**
 * <pre>
 * 'Toy tables' for experimenting with MapReduce over HBase
 *
 * grades table - a HBase table of the form -
 * raw id is a student name
 * column name is Course:course_name
 * cell value is the student's grade in the course 'course_name'
 *
 * Exmaple:
 *
 *         Course:Math  |  Course:Art  |  Course:Sports
 *         ----------------------------------------------
 * Dan        87                97              99
 * Dana       100               100             80
 *
 * =======================================
 *
 * courses table - a HBase table of the form -
 * raw id is a course name
 * column name is Stats:Average
 * cell value is the average grade in that course, computed by a map reduce
job
 *
 * Exmaple:
 *
 *            Stats:Average
 *            --------------
 *  Art          86
 *  Match        77
 * </pre>
 * @see GradesTableMapReduce
 *
 *
 */
public class GradesTable {

  public static final String GRADES_TABLE_NAME = "grades";
  public static final String COURSE_TABLE_NAME = "courses";
  public static final String COURSE_FAMILY = "Course:";
  // A column family holding grades statistics
  public static final String STATS_FAMILY = "Stats:";
  // A column member holding average grade in course
  public static final String AVG = "Average";

  private static final String [] STUDENT_NAMES = {
    "Dan", "Dana", "Sara", "David"
  };

  private static final String [] COURSE_NAMES = {
    "Math", "Art", "Sports"
  };

  private HBaseConfiguration conf;
  private HBaseAdmin admin;
  private HTableDescriptor grades_desc;
  private HTableDescriptor courses_desc;
  // Randomly generate a grade
  private Random rand;

  private static final Log LOG =
LogFactory.getLog(GradesTable.class.getName());

  public GradesTable() throws IOException {
    conf = new HBaseConfiguration();
    admin = new HBaseAdmin(conf);
    grades_desc = new HTableDescriptor(GRADES_TABLE_NAME);
    courses_desc = new HTableDescriptor(COURSE_TABLE_NAME);
    rand = new Random();
  }

  /**
   * Create tables and populate with content
   */
  public void create() throws IOException {
    grades_desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
    courses_desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
    admin.createTable(grades_desc);
    admin.createTable(courses_desc);
    LOG.info("Tables created");

    // Populate grades table with students and their grades in courses
    HTable table = new HTable(conf, new Text(GRADES_TABLE_NAME));

    // Start an update transaction, student name is row id
    for (int i = 0; i < STUDENT_NAMES.length; i++) {
      LOG.info("<<< Row " + i + ", student: " + STUDENT_NAMES[i] + " >>>");
      Text stuName = new Text(STUDENT_NAMES[i]);
      long writeid = table.startUpdate(stuName);
      for (int j = 0; j < COURSE_NAMES.length; j++) {
        Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
        // Put a cell with a student's grade in this course
        int grade = Math.abs(rand.nextInt()) % 101;
        table.put(writeid, courseColumn, new IntWritable(grade));
        LOG.info("Course: " + COURSE_NAMES[j] + ", grade: " + grade);
      }
      table.commit(writeid);
    }
    LOG.info("Grades Table populated");
  }

  public static void main(String [] args) {
    try {
      GradesTable gradesTable = new GradesTable();
      gradesTable.create();
    } catch (IOException e) {
      LOG.fatal("An exception occured", e);
    }
}

=========================================================

/**
 * A map reduce job over {@link GradesTable}
 * The job produces for each course the average grade in that course.
 * It puts the average in a separate table which holds course statistics.
 *
 */
public class GradesTableMapReduce  extends Configured implements Tool {

  /**
   * Map a row to {key, value} pairs.
   * Emit a {course, grade} pair for each course grade appearing in the
student row.
   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
{Sports, 87}
   *
   */
  public static class GradesTableMap extends TableMap<Text, IntWritable> {

    @Override
    public void map(HStoreKey key, MapWritable value,
        OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {

      // Walk through the columns
      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
        // Column name is course name
        Text course = (Text) e.getKey();
        // Remove the family prefix
        String courseStr = HStoreKey.extractQualifier(course).toString();
        course = new Text(courseStr);
        byte [] gradeInBytes = ((ImmutableBytesWritable)
e.getValue()).get();
        IntWritable grade = new IntWritable();
        Writables.getWritable(gradeInBytes, grade);

        // Emit course name and a grade
        output.collect(course, grade);
      }
    }
  }

  /**
   * Reduce - compute an average of key's values which is actually the
average grade in each course.
   * E.g. {Math, {62, 45, 87}} -> {Math, 65.6}
   *
   */
  public static class GradesTableReduce extends TableReduce<Text,
IntWritable> {

    @Override
    // key is course name, values are grades in the course
    public void reduce(Text key, Iterator<IntWritable> values,
        OutputCollector<Text, MapWritable> output, Reporter reporter)
    throws IOException {
      // Compute grades average
      int total = 0;
      int sum = 0;
      while (values.hasNext()) {
        total++;
        sum += values.next().get();
      }
      float average = sum / total;

      // We put the average as a separate column in the courses table
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(baos);
      FloatWritable avgWritable = new FloatWritable(average);
      avgWritable.write(out);
      MapWritable map = new MapWritable();
      map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
              new ImmutableBytesWritable(baos.toByteArray()));
      output.collect(key, map);
    }
  }

  /**
   * Run
   */
  public int run(String[] args) throws Exception {
    JobConf jobConf = new JobConf();
    jobConf.setJobName("compute average grades");
    jobConf.setNumReduceTasks(1);

    // All columns in the course family (i.e. all grades) get into the map
    TableMap.initJob(GradesTable.GRADES_TABLE_NAME,
GradesTable.COURSE_FAMILY,
        GradesTableMap.class, jobConf);

    // Reduce output (course average grade) is put in the courses table
    TableReduce.initJob(GradesTable.COURSE_TABLE_NAME,
        GradesTableReduce.class, jobConf);

    // Map produces a value which is an IntWritable
    jobConf.setMapOutputValueClass(IntWritable.class);

    JobClient.runJob(jobConf);
    return 0;
  }

  public static void main(String [] args) throws Exception {
    ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
  }
}

On Thu, Jul 3, 2008 at 9:44 AM, Naama Kraus <naamakraus@gmail.com> wrote:

> Thanks St.Ack for the further comments and for putting a link in the Wiki.
> Naama
>
>
> On Tue, Jul 1, 2008 at 8:38 PM, stack <stack@duboce.net> wrote:
>
>> Comments in-line below:
>>
>> Naama Kraus wrote:
>>
>>> Here is an updated code
>>>
>>> Naama
>>>
>>> /**
>>>  * <pre>
>>>  * 'Toy tables' for experiencing with MapReduce over HBase
>>>
>>>
>> Do you mean 'experimenting' in the above?
>>
>> ....
>>
>>>  public void create() throws IOException {
>>>
>>>
>>
>> Where does this method get called?  I don't see it.
>>
>>
>>     System.out.println("Grades Table populated");
>>>
>>>
>>
>> Do you want to set up a logger to do the outputting instead?  See the head
>> of (most) hbase classes for example.  Look for 'LOG'.
>>
>>
>>   }
>>> }
>>>
>>>
>>> ====================================================
>>>
>>> /**
>>>  * A map reduce job over {@link GradesTable}
>>>  * The job produces for each course the average grade in that course.
>>>  * It puts the average in a separate table which holds course statistics.
>>>  *
>>>  */
>>> public class GradesTableMapReduce  extends Configured implements Tool {
>>>
>>>  /**
>>>   * Map a row to {key, value} pairs.
>>>   * Emit a {course, grade} pair for each course grade appearing in the
>>> student row.
>>>   * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>>> {Sports, 87}
>>>   *
>>>   */
>>>  public static class GradesTableMap extends TableMap<Text, IntWritable>
{
>>>
>>>    @Override
>>>    public void map(HStoreKey key, MapWritable value,
>>>        OutputCollector<Text, IntWritable> output, Reporter reporter)
>>> throws
>>> IOException {
>>>
>>>      // Walk through the columns
>>>      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>>        // Column name is course name
>>>        Text course = (Text) e.getKey();
>>>        // Remove the family prefix
>>>        String courseStr = course.toString();
>>>        courseStr =
>>>          courseStr.substring(courseStr.indexOf(':') + 1);
>>>
>>>
>>
>> There may be utility in HStoreKey to do the above stripping of the column
>> family (getQualifier?).
>>
>>         course = new Text(courseStr);
>>>        byte [] gradeInBytes = ((ImmutableBytesWritable)
>>> e.getValue()).get();
>>>        DataInputStream in = new DataInputStream(new
>>> ByteArrayInputStream(gradeInBytes));
>>>        IntWritable grade = new IntWritable();
>>>        grade.readFields(in);
>>>
>>>
>>
>> You could have used Writables.getWritable above and saved yourself a few
>> lines (Not important).
>>
>> Otherwise, this class is an excellent example of using MR + HBase.  I've
>> add a pointer to it up on the wiki under the MR+HBase page (update the link
>> if you update your code).
>>
>> Thanks,
>> St.Ack
>>
>
>
>
> --
> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
> 00 oo 00 oo
> "If you want your children to be intelligent, read them fairy tales. If you
> want them to be more intelligent, read them more fairy tales." (Albert
> Einstein)
>



-- 
oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
00 oo 00 oo
"If you want your children to be intelligent, read them fairy tales. If you
want them to be more intelligent, read them more fairy tales." (Albert
Einstein)

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message