hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r787336 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/mapreduce/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/regionserver/
Date Mon, 22 Jun 2009 18:08:33 GMT
Author: stack
Date: Mon Jun 22 18:08:33 2009
New Revision: 787336

URL: http://svn.apache.org/viewvc?rev=787336&view=rev
Log:
HBASE-1558 deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates that into 'now'

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Jun 22 18:08:33 2009
@@ -212,6 +212,8 @@
    HBASE-1547  atomicIncrement doesnt increase hregion.memcacheSize
    HBASE-1553  ClassSize missing in trunk
    HBASE-1561  HTable Mismatch between javadoc and what it actually does
+   HBASE-1558  deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates
+               that into 'now'
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Mon
Jun 22 18:08:33 2009
@@ -43,6 +43,9 @@
    * space delimited list of columns
    */
   public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+  
+  public static final String TIME_RANGE_MAX = "hbase.mapred.timerange.max";
+  public static final String TIME_RANGE_MIN = "hbase.mapred.timerange.min";
 
   public void configure(JobConf job) {
     Path[] tableNames = FileInputFormat.getInputPaths(job);
@@ -53,6 +56,11 @@
       m_cols[i] = Bytes.toBytes(colNames[i]);
     }
     setInputColumns(m_cols);
+    
+    String minArg = job.get(TIME_RANGE_MIN);
+    String maxArg = job.get(TIME_RANGE_MAX);
+    setTimeRange(Long.parseLong(minArg), Long.parseLong(maxArg));
+    
     try {
       setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
     } catch (Exception e) {
@@ -78,5 +86,15 @@
     if (colArg == null || colArg.length() == 0) {
       throw new IOException("expecting at least one column");
     }
+    
+    // maxStamp must be higher then minStamp
+    String minArg = job.get(TIME_RANGE_MIN);
+    String maxArg = job.get(TIME_RANGE_MAX);
+    if (minArg == null || minArg.length() == 0 
+        || maxArg == null || maxArg.length() == 0
+        || Long.parseLong(maxArg) <= Long.parseLong(minArg)) {
+        throw new IOException("invalid time stamp values"); 
+    }
+    
   }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
Mon Jun 22 18:08:33 2009
@@ -29,8 +29,8 @@
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
@@ -77,6 +77,7 @@
 implements InputFormat<ImmutableBytesWritable, Result> {
   final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
   private byte [][] inputColumns;
+  private long minStamp, maxStamp;
   private HTable table;
   private TableRecordReader tableRecordReader;
   private RowFilterInterface rowFilter;
@@ -93,7 +94,8 @@
     private ResultScanner scanner;
     private HTable htable;
     private byte [][] trrInputColumns;
-
+    private long minStamp, maxStamp;
+    
     /**
      * Restart from survivable exceptions by creating a new scanner.
      *
@@ -101,28 +103,33 @@
      * @throws IOException
      */
     public void restart(byte[] firstRow) throws IOException {
+      Scan scan = null;
       if ((endRow != null) && (endRow.length > 0)) {
         if (trrRowFilter != null) {
           final Set<RowFilterInterface> rowFiltersSet =
             new HashSet<RowFilterInterface>();
           rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
           rowFiltersSet.add(trrRowFilter);
-          Scan scan = new Scan(startRow);
-          scan.addColumns(trrInputColumns);
+          scan = new Scan(startRow);
 //          scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
 //              rowFiltersSet));
           this.scanner = this.htable.getScanner(scan);
         } else {
-          Scan scan = new Scan(firstRow, endRow);
-          scan.addColumns(trrInputColumns);
-          this.scanner = this.htable.getScanner(scan);
+          scan = new Scan(firstRow, endRow);
+
         }
       } else {
-        Scan scan = new Scan(firstRow);
-        scan.addColumns(trrInputColumns);
+        scan = new Scan(firstRow);
 //        scan.setFilter(trrRowFilter);
-        this.scanner = this.htable.getScanner(scan);
       }
+      scan.addColumns(trrInputColumns);
+      
+      if (minStamp !=0 && maxStamp !=0) { // if timestamps are ungiven.
+        scan.setTimeRange(minStamp, maxStamp);
+        scan.setMaxVersions();
+      }
+     
+      this.scanner = this.htable.getScanner(scan);
     }
 
     /**
@@ -147,6 +154,11 @@
     public void setInputColumns(final byte [][] inputColumns) {
       this.trrInputColumns = inputColumns;
     }
+    
+    public void setTimeRange(long minStamp, long maxStamp) {
+      this.minStamp = minStamp;
+      this.maxStamp = maxStamp;
+    }
 
     /**
      * @param startRow the first row in the split
@@ -251,6 +263,7 @@
     trr.setEndRow(tSplit.getEndRow());
     trr.setHTable(this.table);
     trr.setInputColumns(this.inputColumns);
+    trr.setTimeRange(this.minStamp, this.maxStamp);
     trr.setRowFilter(this.rowFilter);
     trr.init();
     return trr;
@@ -310,6 +323,16 @@
     this.inputColumns = inputColumns;
   }
 
+
+  /**
+   * @param minStamp
+   * @param maxStam
+   */
+  protected void setTimeRange(long minStamp, long maxStamp) {
+  	this.minStamp = minStamp;
+  	this.maxStamp = maxStamp;
+  }
+  
   /**
    * Allows subclasses to get the {@link HTable}.
    */
@@ -344,4 +367,5 @@
   protected void setRowFilter(RowFilterInterface rowFilter) {
     this.rowFilter = rowFilter;
   }
+
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Mon Jun 22 18:08:33 2009
@@ -51,13 +51,39 @@
     Class<? extends TableMap> mapper, 
     Class<? extends WritableComparable> outputKeyClass, 
     Class<? extends Writable> outputValueClass, JobConf job) {
-      
+    
+    initTableMapJob(table, columns, mapper, outputKeyClass,
+      outputValueClass, job, 0, 0);
+  }
+  
+  /**
+   * Use this before submitting a TableMap job. It will
+   * appropriately set up the JocConf.
+   * 
+   * @param table  The table name to read from.
+   * @param columns  The columns to scan.
+   * @param mapper  The mapper class to use.
+   * @param outputKeyClass  The class of the output key.
+   * @param outputValueClass  The class of the output value.
+   * @param job  The current job configuration to adjust.
+   * @param minStamp  the minimum timestamp, inclusive
+   * @param maxStamp  the maximum timestamp, exclusive
+   */
+  public static void initTableMapJob(String table, String columns,
+    Class<? extends TableMap> mapper, 
+    Class<? extends WritableComparable> outputKeyClass, 
+    Class<? extends Writable> outputValueClass, JobConf job,
+    long minStamp,
+    long maxStamp) {
+		      
     job.setInputFormat(TableInputFormat.class);
     job.setMapOutputValueClass(outputValueClass);
     job.setMapOutputKeyClass(outputKeyClass);
     job.setMapperClass(mapper);
     FileInputFormat.addInputPaths(job, table);
     job.set(TableInputFormat.COLUMN_LIST, columns);
+    job.setLong(TableInputFormat.TIME_RANGE_MIN, minStamp);
+    job.setLong(TableInputFormat.TIME_RANGE_MAX, maxStamp);	    
   }
   
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Jun
22 18:08:33 2009
@@ -1128,8 +1128,10 @@
   public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
   throws IOException {
     long now = System.currentTimeMillis();
+    byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
+
     try {
       if (writeToWAL) {
         this.log.append(regionInfo.getRegionName(),
@@ -1158,7 +1160,10 @@
           KeyValue getkv = result.get(0);
           Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
             getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+        } else {
+          kv.updateLatestStamp(byteNow);
         }
+
         size = this.memcacheSize.addAndGet(store.delete(kv));
       }
       flush = isFlushSize(size);

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Mon Jun 22 18:08:33
2009
@@ -29,6 +29,7 @@
 import java.util.SortedMap;
 
 import junit.framework.TestCase;
+import junit.framework.AssertionFailedError;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -658,4 +659,13 @@
       root.getLog().closeAndDelete();
     }
   }
+
+  public void assertByteEquals(byte[] expected,
+                               byte[] actual) {
+    if (Bytes.compareTo(expected, actual) != 0) {
+      throw new AssertionFailedError("expected:<" +
+      Bytes.toString(expected) + "> but was:<" +
+      Bytes.toString(actual) + ">");
+    }
+  }
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=787336&r1=787335&r2=787336&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Mon
Jun 22 18:08:33 2009
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeMap;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,7 +58,14 @@
   private final String DIR = "test/build/data/TestHRegion/";
   
   private final int MAX_VERSIONS = 2;
-  
+
+  // Test names
+  private final byte[] tableName = Bytes.toBytes("testtable");;
+  private final byte[] qual1 = Bytes.toBytes("qual1");
+  private final byte[] value1 = Bytes.toBytes("value1");
+  private final byte[] value2 = Bytes.toBytes("value2");
+  private final byte [] row = Bytes.toBytes("rowA");
+
   /**
    * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
    */
@@ -325,13 +333,68 @@
     assertTrue(Bytes.equals(rowB, results.get(0).getRow()));
 
   }
+
+  public void testDeleteColumns_PostInsert() throws IOException,
+      InterruptedException {
+    Delete delete = new Delete(row);
+    delete.deleteColumns(fam1, qual1);
+    doTestDelete_AndPostInsert(delete);
+  }
+
+  public void testDeleteFamily_PostInsert() throws IOException, InterruptedException {
+    Delete delete = new Delete(row);
+    delete.deleteFamily(fam1);
+    doTestDelete_AndPostInsert(delete);
+  }
+
+  public void doTestDelete_AndPostInsert(Delete delete)
+      throws IOException, InterruptedException {
+    initHRegion(tableName, getName(), fam1);
+    Put put = new Put(row);
+    put.add(fam1, qual1, value1);
+    region.put(put);
+
+    Thread.sleep(10);
+    
+    // now delete the value:
+    region.delete(delete, null, true);
+
+    Thread.sleep(10);
+
+    // ok put data:
+    put = new Put(row);
+    put.add(fam1, qual1, value2);
+    region.put(put);
+
+    // ok get:
+    Get get = new Get(row);
+    get.addColumn(fam1, qual1);
+
+    Result r = region.get(get, null);
+    assertEquals(1, r.size());
+    assertByteEquals(value2, r.getValue(fam1, qual1));
+
+    // next:
+    Scan scan = new Scan(row);
+    scan.addColumn(fam1, qual1);
+    InternalScanner s = region.getScanner(scan);
+
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    assertEquals(false, s.next(results));
+    assertEquals(1, results.size());
+    KeyValue kv = results.get(0);
+
+    assertByteEquals(value2, kv.getValue());
+    assertByteEquals(fam1, kv.getFamily());
+    assertByteEquals(qual1, kv.getQualifier());
+    assertByteEquals(row, kv.getRow());
+  }
+
+
   
-  //Visual test, since the method doesn't return anything
   public void testDelete_CheckTimestampUpdated()
   throws IOException {
-    byte [] tableName = Bytes.toBytes("testtable");
     byte [] row1 = Bytes.toBytes("row1");
-    byte [] fam1 = Bytes.toBytes("fam1");
     byte [] col1 = Bytes.toBytes("col1");
     byte [] col2 = Bytes.toBytes("col2");
     byte [] col3 = Bytes.toBytes("col3");
@@ -345,8 +408,19 @@
     kvs.add(new KeyValue(row1, fam1, col1, null));
     kvs.add(new KeyValue(row1, fam1, col2, null));
     kvs.add(new KeyValue(row1, fam1, col3, null));
-    
+
     region.delete(fam1, kvs, true);
+
+    // extract the key values out the memcache:
+    // This is kinda hacky, but better than nothing...
+    long now = System.currentTimeMillis();
+    KeyValue firstKv = region.getStore(fam1).memcache.memcache.first();
+    assertTrue(firstKv.getTimestamp() <= now);
+    now = firstKv.getTimestamp();
+    for (KeyValue kv : region.getStore(fam1).memcache.memcache) {
+      assertTrue(kv.getTimestamp() <= now);
+      now = kv.getTimestamp();
+    }
   }
   
   //////////////////////////////////////////////////////////////////////////////
@@ -1054,15 +1128,14 @@
     byte [] qf1 = Bytes.toBytes("qualifier1");
     byte [] qf2 = Bytes.toBytes("qualifier2");
     byte [] fam1 = Bytes.toBytes("fam1");
-    byte [][] families = {fam1};
-    
+
     long ts1 = 1; //System.currentTimeMillis();
     long ts2 = ts1 + 1;
     long ts3 = ts1 + 2;
     
     //Setting up region
     String method = this.getName();
-    initHRegion(tableName, method, families);
+    initHRegion(tableName, method, fam1);
     
     //Putting data in Region
     Put put = null;
@@ -1105,15 +1178,64 @@
       assertEquals(expected.get(i), actual.get(i));
     }
   }
+
+  public void testScanner_StopRow1542() throws IOException {
+    byte [] tableName = Bytes.toBytes("test_table");
+    byte [] family = Bytes.toBytes("testFamily");
+    initHRegion(tableName, getName(), family);
+
+    byte [] row1 = Bytes.toBytes("row111");
+    byte [] row2 = Bytes.toBytes("row222");
+    byte [] row3 = Bytes.toBytes("row333");
+    byte [] row4 = Bytes.toBytes("row444");
+    byte [] row5 = Bytes.toBytes("row555");
+
+    byte [] col1 = Bytes.toBytes("Pub111");
+    byte [] col2 = Bytes.toBytes("Pub222");
+
+
+
+    Put put = new Put(row1);
+    put.add(family, col1, Bytes.toBytes(10L));
+    region.put(put);
+
+    put = new Put(row2);
+    put.add(family, col1, Bytes.toBytes(15L));
+    region.put(put);
+
+    put = new Put(row3);
+    put.add(family, col2, Bytes.toBytes(20L));
+    region.put(put);
+
+    put = new Put(row4);
+    put.add(family, col2, Bytes.toBytes(30L));
+    region.put(put);
+
+    put = new Put(row5);
+    put.add(family, col1, Bytes.toBytes(40L));
+    region.put(put);
+
+    Scan scan = new Scan(row3, row4);
+    scan.setMaxVersions();
+    scan.addColumn(family, col1);
+    InternalScanner s = region.getScanner(scan);
+
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    assertEquals(false, s.next(results));
+    assertEquals(0, results.size());
+
+
+
+    
+  }
   
   public void testScanner_Wildcard_FromMemcacheAndFiles_EnforceVersions()
   throws IOException {
     byte [] tableName = Bytes.toBytes("testtable");
     byte [] row1 = Bytes.toBytes("row1");
     byte [] fam1 = Bytes.toBytes("fam1");
-    byte [][] families = {fam1};
     byte [] qf1 = Bytes.toBytes("qualifier1");
-    byte [] qf2 = Bytes.toBytes("qualifier2");
+    byte [] qf2 = Bytes.toBytes("quateslifier2");
     
     long ts1 = 1;
     long ts2 = ts1 + 1;
@@ -1122,7 +1244,7 @@
     
     //Setting up region
     String method = this.getName();
-    initHRegion(tableName, method, families);
+    initHRegion(tableName, method, fam1);
     
     //Putting data in Region
     KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);



Mime
View raw message