hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r923404 - in /hadoop/hbase/trunk: ./ core/src/main/java/org/apache/hadoop/hbase/mapred/ core/src/main/java/org/apache/hadoop/hbase/mapreduce/
Date Mon, 15 Mar 2010 19:37:49 GMT
Author: stack
Date: Mon Mar 15 19:37:48 2010
New Revision: 923404

URL: http://svn.apache.org/viewvc?rev=923404&view=rev
Log:
HBASE-2324 Refactoring of TableRecordReader (mapred / mapreduce) for reuse outside the scope
of InputSplit / RecordReader 

Added:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=923404&r1=923403&r2=923404&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Mar 15 19:37:48 2010
@@ -434,6 +434,8 @@ Release 0.21.0 - Unreleased
                (Kay Kay via Stack)
    HBASE-2279  Hbase Shell does not have any tests (Alexey Kovyrin via Stack)
    HBASE-2314  [shell] Support for getting counters (Alexey Kovyrin via Stack)
+   HBASE-2324  Refactoring of TableRecordReader (mapred / mapreduce) for reuse
+               outside the scope of InputSplit / RecordReader (Kay Kay via Stack)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=923404&r1=923403&r2=923404&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
Mon Mar 15 19:37:48 2010
@@ -24,23 +24,16 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
@@ -81,157 +74,6 @@ implements InputFormat<ImmutableBytesWri
   private Filter rowFilter;
 
   /**
-   * Iterate over an HBase table data, return (Text, RowResult) pairs
-   */
-  protected class TableRecordReader
-  implements RecordReader<ImmutableBytesWritable, Result> {
-    private byte [] startRow;
-    private byte [] endRow;
-    private byte [] lastRow;
-    private Filter trrRowFilter;
-    private ResultScanner scanner;
-    private HTable htable;
-    private byte [][] trrInputColumns;
-
-    /**
-     * Restart from survivable exceptions by creating a new scanner.
-     *
-     * @param firstRow
-     * @throws IOException
-     */
-    public void restart(byte[] firstRow) throws IOException {
-      if ((endRow != null) && (endRow.length > 0)) {
-        if (trrRowFilter != null) {
-          Scan scan = new Scan(firstRow, endRow);
-          scan.addColumns(trrInputColumns);
-          scan.setFilter(trrRowFilter);
-          this.scanner = this.htable.getScanner(scan);
-        } else {
-          LOG.debug("TIFB.restart, firstRow: " +
-              Bytes.toStringBinary(firstRow) + ", endRow: " +
-              Bytes.toStringBinary(endRow));
-          Scan scan = new Scan(firstRow, endRow);
-          scan.addColumns(trrInputColumns);
-          this.scanner = this.htable.getScanner(scan);
-        }
-      } else {
-        LOG.debug("TIFB.restart, firstRow: " +
-            Bytes.toStringBinary(firstRow) + ", no endRow");
-
-        Scan scan = new Scan(firstRow);
-        scan.addColumns(trrInputColumns);
-//        scan.setFilter(trrRowFilter);
-        this.scanner = this.htable.getScanner(scan);
-      }
-    }
-
-    /**
-     * Build the scanner. Not done in constructor to allow for extension.
-     *
-     * @throws IOException
-     */
-    public void init() throws IOException {
-      restart(startRow);
-    }
-
-    /**
-     * @param htable the {@link HTable} to scan.
-     */
-    public void setHTable(HTable htable) {
-      this.htable = htable;
-    }
-
-    /**
-     * @param inputColumns the columns to be placed in {@link Result}.
-     */
-    public void setInputColumns(final byte [][] inputColumns) {
-      this.trrInputColumns = inputColumns;
-    }
-
-    /**
-     * @param startRow the first row in the split
-     */
-    public void setStartRow(final byte [] startRow) {
-      this.startRow = startRow;
-    }
-
-    /**
-     *
-     * @param endRow the last row in the split
-     */
-    public void setEndRow(final byte [] endRow) {
-      this.endRow = endRow;
-    }
-
-    /**
-     * @param rowFilter the {@link Filter} to be used.
-     */
-    public void setRowFilter(Filter rowFilter) {
-      this.trrRowFilter = rowFilter;
-    }
-
-    public void close() {
-      this.scanner.close();
-    }
-
-    /**
-     * @return ImmutableBytesWritable
-     *
-     * @see org.apache.hadoop.mapred.RecordReader#createKey()
-     */
-    public ImmutableBytesWritable createKey() {
-      return new ImmutableBytesWritable();
-    }
-
-    /**
-     * @return RowResult
-     *
-     * @see org.apache.hadoop.mapred.RecordReader#createValue()
-     */
-    public Result createValue() {
-      return new Result();
-    }
-
-    public long getPos() {
-      // This should be the ordinal tuple in the range;
-      // not clear how to calculate...
-      return 0;
-    }
-
-    public float getProgress() {
-      // Depends on the total number of tuples and getPos
-      return 0;
-    }
-
-    /**
-     * @param key HStoreKey as input key.
-     * @param value MapWritable as input value
-     * @return true if there was more data
-     * @throws IOException
-     */
-    public boolean next(ImmutableBytesWritable key, Result value)
-    throws IOException {
-      Result result;
-      try {
-        result = this.scanner.next();
-      } catch (UnknownScannerException e) {
-        LOG.debug("recovered from " + StringUtils.stringifyException(e));
-        restart(lastRow);
-        this.scanner.next();    // skip presumed already mapped row
-        result = this.scanner.next();
-      }
-
-      if (result != null && result.size() > 0) {
-        key.set(result.getRow());
-        lastRow = key.get();
-        Writables.copyWritable(result, value);
-        return true;
-      }
-      return false;
-    }
-  }
-
-  /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
    *

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java?rev=923404&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
(added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReader.java
Mon Mar 15 19:37:48 2010
@@ -0,0 +1,138 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.RecordReader;
+
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+public class TableRecordReader
+implements RecordReader<ImmutableBytesWritable, Result> {
+
+  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
+  
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    this.recordReaderImpl.restart(firstRow);
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    this.recordReaderImpl.restart(this.recordReaderImpl.getStartRow());
+  }
+
+  /**
+   * @param htable the {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.recordReaderImpl.setHTable(htable);
+  }
+
+  /**
+   * @param inputColumns the columns to be placed in {@link Result}.
+   */
+  public void setInputColumns(final byte [][] inputColumns) {
+    this.recordReaderImpl.setInputColumns(inputColumns);
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte [] startRow) {
+    this.recordReaderImpl.setStartRow(startRow);
+  }
+
+  /**
+   *
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte [] endRow) {
+    this.recordReaderImpl.setEndRow(endRow);
+  }
+
+  /**
+   * @param rowFilter the {@link Filter} to be used.
+   */
+  public void setRowFilter(Filter rowFilter) {
+    this.recordReaderImpl.setRowFilter(rowFilter);
+  }
+
+  public void close() {
+    this.recordReaderImpl.close();
+  }
+
+  /**
+   * @return ImmutableBytesWritable
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createKey()
+   */
+  public ImmutableBytesWritable createKey() {
+    return this.recordReaderImpl.createKey();
+  }
+
+  /**
+   * @return RowResult
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createValue()
+   */
+  public Result createValue() {
+    return this.recordReaderImpl.createValue();
+  }
+
+  public long getPos() {
+    
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return this.recordReaderImpl.getPos();
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return this.recordReaderImpl.getPos();
+  }
+
+  /**
+   * @param key HStoreKey as input key.
+   * @param value MapWritable as input value
+   * @return true if there was more data
+   * @throws IOException
+   */
+  public boolean next(ImmutableBytesWritable key, Result value)
+  throws IOException {
+    return this.recordReaderImpl.next(key, value);
+  }
+}

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java?rev=923404&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
(added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java
Mon Mar 15 19:37:48 2010
@@ -0,0 +1,192 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+
+import org.apache.hadoop.util.StringUtils;
+
+
+/**
+ * Iterate over an HBase table data, return (Text, RowResult) pairs
+ */
+public class TableRecordReaderImpl {
+  static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
+
+  private byte [] startRow;
+  private byte [] endRow;
+  private byte [] lastRow;
+  private Filter trrRowFilter;
+  private ResultScanner scanner;
+  private HTable htable;
+  private byte [][] trrInputColumns;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow
+   * @throws IOException
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    if ((endRow != null) && (endRow.length > 0)) {
+      if (trrRowFilter != null) {
+        Scan scan = new Scan(firstRow, endRow);
+        scan.addColumns(trrInputColumns);
+        scan.setFilter(trrRowFilter);
+        this.scanner = this.htable.getScanner(scan);
+      } else {
+        LOG.debug("TIFB.restart, firstRow: " +
+            Bytes.toStringBinary(firstRow) + ", endRow: " +
+            Bytes.toStringBinary(endRow));
+        Scan scan = new Scan(firstRow, endRow);
+        scan.addColumns(trrInputColumns);
+        this.scanner = this.htable.getScanner(scan);
+      }
+    } else {
+      LOG.debug("TIFB.restart, firstRow: " +
+          Bytes.toStringBinary(firstRow) + ", no endRow");
+
+      Scan scan = new Scan(firstRow);
+      scan.addColumns(trrInputColumns);
+//      scan.setFilter(trrRowFilter);
+      this.scanner = this.htable.getScanner(scan);
+    }
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    restart(startRow);
+  }
+
+  byte[] getStartRow() { 
+    return this.startRow;
+  }
+  /**
+   * @param htable the {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.htable = htable;
+  }
+
+  /**
+   * @param inputColumns the columns to be placed in {@link Result}.
+   */
+  public void setInputColumns(final byte [][] inputColumns) {
+    this.trrInputColumns = inputColumns;
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte [] startRow) {
+    this.startRow = startRow;
+  }
+
+  /**
+   *
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte [] endRow) {
+    this.endRow = endRow;
+  }
+
+  /**
+   * @param rowFilter the {@link Filter} to be used.
+   */
+  public void setRowFilter(Filter rowFilter) {
+    this.trrRowFilter = rowFilter;
+  }
+
+  public void close() {
+    this.scanner.close();
+  }
+
+  /**
+   * @return ImmutableBytesWritable
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createKey()
+   */
+  public ImmutableBytesWritable createKey() {
+    return new ImmutableBytesWritable();
+  }
+
+  /**
+   * @return RowResult
+   *
+   * @see org.apache.hadoop.mapred.RecordReader#createValue()
+   */
+  public Result createValue() {
+    return new Result();
+  }
+
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return 0;
+  }
+
+  /**
+   * @param key HStoreKey as input key.
+   * @param value MapWritable as input value
+   * @return true if there was more data
+   * @throws IOException
+   */
+  public boolean next(ImmutableBytesWritable key, Result value)
+  throws IOException {
+    Result result;
+    try {
+      result = this.scanner.next();
+    } catch (UnknownScannerException e) {
+      LOG.debug("recovered from " + StringUtils.stringifyException(e));
+      restart(lastRow);
+      this.scanner.next();    // skip presumed already mapped row
+      result = this.scanner.next();
+    }
+
+    if (result != null && result.size() > 0) {
+      key.set(result.getRow());
+      lastRow = key.get();
+      Writables.copyWritable(result, value);
+      return true;
+    }
+    return false;
+  }
+}

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=923404&r1=923403&r2=923404&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Mon Mar 15 19:37:48 2010
@@ -79,154 +79,7 @@ extends InputFormat<ImmutableBytesWritab
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
 
-  /**
-   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 
-   * pairs.
-   */
-  protected class TableRecordReader
-  extends RecordReader<ImmutableBytesWritable, Result> {
-    
-    private ResultScanner scanner = null;
-    private Scan scan = null;
-    private HTable htable = null;
-    private byte[] lastRow = null;
-    private ImmutableBytesWritable key = null;
-    private Result value = null;
-
-    /**
-     * Restart from survivable exceptions by creating a new scanner.
-     *
-     * @param firstRow  The first row to start at.
-     * @throws IOException When restarting fails.
-     */
-    public void restart(byte[] firstRow) throws IOException {
-      Scan newScan = new Scan(scan);
-      newScan.setStartRow(firstRow);
-      this.scanner = this.htable.getScanner(newScan);
-    }
-
-    /**
-     * Build the scanner. Not done in constructor to allow for extension.
-     *
-     * @throws IOException When restarting the scan fails. 
-     */
-    public void init() throws IOException {
-      restart(scan.getStartRow());
-    }
-
-    /**
-     * Sets the HBase table.
-     * 
-     * @param htable  The {@link HTable} to scan.
-     */
-    public void setHTable(HTable htable) {
-      this.htable = htable;
-    }
-
-    /**
-     * Sets the scan defining the actual details like columns etc.
-     *  
-     * @param scan  The scan to set.
-     */
-    public void setScan(Scan scan) {
-      this.scan = scan;
-    }
-
-    /**
-     * Closes the split.
-     * 
-     * @see org.apache.hadoop.mapreduce.RecordReader#close()
-     */
-    @Override
-    public void close() {
-      this.scanner.close();
-    }
-
-    /**
-     * Returns the current key.
-     *  
-     * @return The current key.
-     * @throws IOException
-     * @throws InterruptedException When the job is aborted.
-     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
-     */
-    @Override
-    public ImmutableBytesWritable getCurrentKey() throws IOException,
-        InterruptedException {
-      return key;
-    }
-
-    /**
-     * Returns the current value.
-     * 
-     * @return The current value.
-     * @throws IOException When the value is faulty.
-     * @throws InterruptedException When the job is aborted.
-     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
-     */
-    @Override
-    public Result getCurrentValue() throws IOException, InterruptedException {
-      return value;
-    }
-
-    /**
-     * Initializes the reader.
-     * 
-     * @param inputsplit  The split to work with.
-     * @param context  The current task context.
-     * @throws IOException When setting up the reader fails.
-     * @throws InterruptedException When the job is aborted.
-     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
-     *   org.apache.hadoop.mapreduce.InputSplit, 
-     *   org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
-    @Override
-    public void initialize(InputSplit inputsplit,
-        TaskAttemptContext context) throws IOException,
-        InterruptedException {
-    }
-
-    /**
-     * Positions the record reader to the next record.
-     *  
-     * @return <code>true</code> if there was another record.
-     * @throws IOException When reading the record failed.
-     * @throws InterruptedException When the job was aborted.
-     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
-     */
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (key == null) key = new ImmutableBytesWritable();
-      if (value == null) value = new Result();
-      try {
-        value = this.scanner.next();
-      } catch (IOException e) {
-        LOG.debug("recovered from " + StringUtils.stringifyException(e));  
-        restart(lastRow);
-        scanner.next();    // skip presumed already mapped row
-        value = scanner.next();
-      }
-      if (value != null && value.size() > 0) {
-        key.set(value.getRow());
-        lastRow = key.get();
-        return true;
-      }
-      return false;
-    }
-
-    /**
-     * The current progress of the record reader through its data.
-     * 
-     * @return A number between 0.0 and 1.0, the fraction of the data read.
-     * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
-     */
-    @Override
-    public float getProgress() {
-      // Depends on the total number of tuples
-      return 0;
-    }
-  }
-
+ 
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java?rev=923404&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
(added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReader.java
Mon Mar 15 19:37:48 2010
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 
+ * pairs.
+ */
+public class TableRecordReader
+extends RecordReader<ImmutableBytesWritable, Result> {
+  
+  private TableRecordReaderImpl recordReaderImpl = new TableRecordReaderImpl();
+  
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow  The first row to start at.
+   * @throws IOException When restarting fails.
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    this.recordReaderImpl.restart(firstRow);
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException When restarting the scan fails. 
+   */
+  public void init() throws IOException {
+    this.recordReaderImpl.init();
+  }
+
+  /**
+   * Sets the HBase table.
+   * 
+   * @param htable  The {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.recordReaderImpl.setHTable(htable);
+  }
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *  
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.recordReaderImpl.setScan(scan);
+  }
+
+  /**
+   * Closes the split.
+   * 
+   * @see org.apache.hadoop.mapreduce.RecordReader#close()
+   */
+  @Override
+  public void close() {
+    this.recordReaderImpl.close();
+  }
+
+  /**
+   * Returns the current key.
+   *  
+   * @return The current key.
+   * @throws IOException
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
+   */
+  @Override
+  public ImmutableBytesWritable getCurrentKey() throws IOException,
+      InterruptedException {
+    return this.recordReaderImpl.getCurrentKey();
+  }
+
+  /**
+   * Returns the current value.
+   * 
+   * @return The current value.
+   * @throws IOException When the value is faulty.
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
+   */
+  @Override
+  public Result getCurrentValue() throws IOException, InterruptedException {
+    return this.recordReaderImpl.getCurrentValue();
+  }
+
+  /**
+   * Initializes the reader.
+   * 
+   * @param inputsplit  The split to work with.
+   * @param context  The current task context.
+   * @throws IOException When setting up the reader fails.
+   * @throws InterruptedException When the job is aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
+   *   org.apache.hadoop.mapreduce.InputSplit, 
+   *   org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public void initialize(InputSplit inputsplit,
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+  }
+
+  /**
+   * Positions the record reader to the next record.
+   *  
+   * @return <code>true</code> if there was another record.
+   * @throws IOException When reading the record failed.
+   * @throws InterruptedException When the job was aborted.
+   * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
+   */
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return this.recordReaderImpl.nextKeyValue();
+  }
+
+  /**
+   * The current progress of the record reader through its data.
+   * 
+   * @return A number between 0.0 and 1.0, the fraction of the data read.
+   * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
+   */
+  @Override
+  public float getProgress() {
+    return this.recordReaderImpl.getProgress();
+  }
+}

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java?rev=923404&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
(added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
Mon Mar 15 19:37:48 2010
@@ -0,0 +1,157 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Iterate over an HBase table data, return (ImmutableBytesWritable, Result) 
+ * pairs.
+ */
+public class TableRecordReaderImpl {
+
+
+  static final Log LOG = LogFactory.getLog(TableRecordReader.class);
+
+  private ResultScanner scanner = null;
+  private Scan scan = null;
+  private HTable htable = null;
+  private byte[] lastRow = null;
+  private ImmutableBytesWritable key = null;
+  private Result value = null;
+
+  /**
+   * Restart from survivable exceptions by creating a new scanner.
+   *
+   * @param firstRow  The first row to start at.
+   * @throws IOException When restarting fails.
+   */
+  public void restart(byte[] firstRow) throws IOException {
+    Scan newScan = new Scan(scan);
+    newScan.setStartRow(firstRow);
+    this.scanner = this.htable.getScanner(newScan);
+  }
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   *
+   * @throws IOException When restarting the scan fails. 
+   */
+  public void init() throws IOException {
+    restart(scan.getStartRow());
+  }
+
+  /**
+   * Sets the HBase table.
+   * 
+   * @param htable  The {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.htable = htable;
+  }
+
+  /**
+   * Sets the scan defining the actual details like columns etc.
+   *  
+   * @param scan  The scan to set.
+   */
+  public void setScan(Scan scan) {
+    this.scan = scan;
+  }
+
+  /**
+   * Closes the split.
+   * 
+   * 
+   */
+  public void close() {
+    this.scanner.close();
+  }
+
+  /**
+   * Returns the current key.
+   *  
+   * @return The current key.
+   * @throws IOException
+   * @throws InterruptedException When the job is aborted.
+   */
+  public ImmutableBytesWritable getCurrentKey() throws IOException,
+      InterruptedException {
+    return key;
+  }
+
+  /**
+   * Returns the current value.
+   * 
+   * @return The current value.
+   * @throws IOException When the value is faulty.
+   * @throws InterruptedException When the job is aborted.
+   */
+  public Result getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
+
+
+  /**
+   * Positions the record reader to the next record.
+   *  
+   * @return <code>true</code> if there was another record.
+   * @throws IOException When reading the record failed.
+   * @throws InterruptedException When the job was aborted.
+   */
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (key == null) key = new ImmutableBytesWritable();
+    if (value == null) value = new Result();
+    try {
+      value = this.scanner.next();
+    } catch (IOException e) {
+      LOG.debug("recovered from " + StringUtils.stringifyException(e));  
+      restart(lastRow);
+      scanner.next();    // skip presumed already mapped row
+      value = scanner.next();
+    }
+    if (value != null && value.size() > 0) {
+      key.set(value.getRow());
+      lastRow = key.get();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * The current progress of the record reader through its data.
+   * 
+   * @return A number between 0.0 and 1.0, the fraction of the data read.
+   */
+  public float getProgress() {
+    // Depends on the total number of tuples
+    return 0;
+  }
+  
+}



Mime
View raw message