hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r599643 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
Date Fri, 30 Nov 2007 00:01:28 GMT
Author: stack
Date: Thu Nov 29 16:01:25 2007
New Revision: 599643

URL: http://svn.apache.org/viewvc?rev=599643&view=rev
Log:
HADOOP-2234 TableInputFormat erroneously aggregates map values

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=599643&r1=599642&r2=599643&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Nov 29 16:01:25 2007
@@ -40,6 +40,7 @@
    HADOOP-2253 getRow can return HBASE::DELETEVAL cells
                (Bryan Duxbury via Stack)
    HADOOP-2295 Fix assigning a region to multiple servers
+   HADOOP-2234 TableInputFormat erroneously aggregates map values
 
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=599643&r1=599642&r2=599643&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
Thu Nov 29 16:01:25 2007
@@ -48,8 +48,7 @@
  * Convert HBase tabular data into a format that is consumable by Map/Reduce
  */
 public class TableInputFormat
-implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
-  
+implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {  
   static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
 
   /**
@@ -67,9 +66,9 @@
    * return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs
    */
   class TableRecordReader implements RecordReader<HStoreKey, MapWritable> {
-    private HScannerInterface m_scanner;
-    private SortedMap<Text, byte[]> m_row; // current buffer
-    private Text m_endRow;
+    private final HScannerInterface m_scanner;
+    // current buffer
+    private final SortedMap<Text, byte[]> m_row = new TreeMap<Text, byte[]>();
 
     /**
      * Constructor
@@ -78,14 +77,15 @@
      * @throws IOException
      */
     public TableRecordReader(Text startRow, Text endRow) throws IOException {
-      m_row = new TreeMap<Text, byte[]>();
-      m_scanner = m_table.obtainScanner(m_cols, startRow);
-      m_endRow = endRow;
+      if (endRow != null && endRow.getLength() > 0) {
+        this.m_scanner = m_table.obtainScanner(m_cols, startRow, endRow);
+      } else {
+        this.m_scanner = m_table.obtainScanner(m_cols, startRow);
+      }
     }
 
-    /** {@inheritDoc} */
     public void close() throws IOException {
-      m_scanner.close();
+      this.m_scanner.close();
     }
 
     /**
@@ -132,20 +132,14 @@
      */
     @SuppressWarnings("unchecked")
     public boolean next(HStoreKey key, MapWritable value) throws IOException {
-      m_row.clear();
+      this.m_row.clear();
       HStoreKey tKey = key;
-      boolean hasMore = m_scanner.next(tKey, m_row);
-
-      if(hasMore) {
-        if(m_endRow.getLength() > 0 &&
-            (tKey.getRow().compareTo(m_endRow) >= 0)) {
-          
-          hasMore = false;
-          
-        } else {
-          for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
-            value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
-          }
+      boolean hasMore = this.m_scanner.next(tKey, this.m_row);
+      if (hasMore) {
+        // Clear value to remove content added by previous call to next.
+        value.clear();
+        for (Map.Entry<Text, byte[]> e: this.m_row.entrySet()) {
+          value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
         }
       }
       return hasMore;
@@ -153,12 +147,11 @@
 
   }
 
-  /** {@inheritDoc} */
   public RecordReader<HStoreKey, MapWritable> getRecordReader(
       InputSplit split,
       @SuppressWarnings("unused") JobConf job,
-      @SuppressWarnings("unused") Reporter reporter) throws IOException {
-    
+      @SuppressWarnings("unused") Reporter reporter)
+  throws IOException {  
     TableSplit tSplit = (TableSplit)split;
     return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
   }
@@ -185,7 +178,6 @@
     return splits;
   }
 
-  /** {@inheritDoc} */
   public void configure(JobConf job) {
     Path[] tableNames = job.getInputPaths();
     m_tableName = new Text(tableNames[0].getName());
@@ -202,21 +194,17 @@
     }
   }
 
-  /** {@inheritDoc} */
   public void validateInput(JobConf job) throws IOException {
-
     // expecting exactly one path
-    
     Path[] tableNames = job.getInputPaths();
     if(tableNames == null || tableNames.length > 1) {
       throw new IOException("expecting one table name");
     }
 
     // expecting at least one column
-    
     String colArg = job.get(COLUMN_LIST);
     if(colArg == null || colArg.length() == 0) {
       throw new IOException("expecting at least one column");
     }
   }
-}
+}
\ No newline at end of file



Mime
View raw message