hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r732094 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/client/
Date Tue, 06 Jan 2009 20:21:17 GMT
Author: stack
Date: Tue Jan  6 12:21:17 2009
New Revision: 732094

URL: http://svn.apache.org/viewvc?rev=732094&view=rev
Log:
HBASE-1090 Atomic Check And Save in HTable

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestHTable.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Jan  6 12:21:17 2009
@@ -209,6 +209,7 @@
    HBASE-1106  Expose getClosestRowBefore in HTable
                (Michael Gottesman via Stack)
    HBASE-1082  Administrative functions for table/region maintenance
+   HBASE-1090  Atomic Check And Save in HTable (Michael Gottesman via Stack)
 
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Tue Jan  6 12:21:17
2009
@@ -45,6 +45,7 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 
@@ -1335,6 +1336,33 @@
   }
   
   /**
+   * Atomically checks if a row's values match
+   * the expectedValues. If it does, it uses the
+   * batchUpdate to update the row.
+   * @param batchUpdate batchupdate to apply if check is successful
+   * @param expectedValues values to check
+   * @param rl rowlock
+   * @throws IOException
+   */
+  public synchronized boolean checkAndSave(final BatchUpdate batchUpdate,
+    final HbaseMapWritable<byte[],byte[]> expectedValues, final RowLock rl)
+  throws IOException {
+    checkRowAndColumns(batchUpdate);
+    if(rl != null) {
+      batchUpdate.setRowLock(rl.getLockId());
+    }
+    return connection.getRegionServerWithRetries(
+      new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
+        public Boolean call() throws IOException {
+          return server.checkAndSave(location.getRegionInfo().getRegionName(),
+            batchUpdate, expectedValues)?
+              Boolean.TRUE: Boolean.FALSE;
+        }
+      }
+    ).booleanValue();
+  }
+  
+  /**
    * Commit to the table the buffer of BatchUpdate.
    * Called automaticaly in the commit methods when autoFlush is true.
    * @throws IOException

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Jan
 6 12:21:17 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -118,6 +119,21 @@
   throws IOException;
   
   /**
+   * Applies a batch of updates to one row atomically via one RPC
+   * if the columns specified in expectedValues match
+   * the given values in expectedValues
+   * 
+   * @param regionName name of the region to update
+   * @param b BatchUpdate
+   * @param expectedValues map of column names to expected data values.
+   * @throws IOException
+   */
+  public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
+      final HbaseMapWritable<byte[],byte[]> expectedValues)
+  throws IOException;
+  
+
+  /**
    * Delete all cells that match the passed row and column and whose timestamp
    * is equal-to or older than the passed timestamp.
    * 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jan
 6 12:21:17 2009
@@ -1291,6 +1291,96 @@
     }
   }
 
+
+  /**
+   * Performs an atomic check and save operation. Checks if
+   * the specified expected values have changed, and if not
+   * applies the update.
+   * 
+   * @param b the update to apply
+   * @param expectedValues the expected values to check
+   * @param writeToWAL whether or not to write to the write ahead log
+   */
+  public boolean checkAndSave(BatchUpdate b,
+    HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
+    boolean writeToWAL)
+  throws IOException {
+    // This is basically a copy of batchUpdate with the atomic check and save
+    // added in. So you should read this method with batchUpdate. I will
+    // comment the areas that I have changed where I have not changed, you
+    // should read the comments from the batchUpdate method
+    boolean success = true;
+    checkReadOnly();
+    checkResources();
+    splitsAndClosesLock.readLock().lock();
+    try {
+      byte[] row = b.getRow();
+      Integer lid = getLock(lockid,row);
+      try {
+        Set<byte[]> keySet = expectedValues.keySet();
+        Map<byte[],Cell> actualValues = this.getFull(row,keySet,
+        HConstants.LATEST_TIMESTAMP, 1,lid);
+        for (byte[] key : keySet) {
+          // If test fails exit
+          if(!Bytes.equals(actualValues.get(key).getValue(),
+            expectedValues.get(key))) {
+            success = false;
+            break;
+          }
+        }
+        
+        if (success) {
+          long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
+            System.currentTimeMillis(): b.getTimestamp();
+          List<byte []> deletes = null;
+          for (BatchOperation op: b) {
+            HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime,
+                this.regionInfo);
+            byte[] val = null;
+            if (op.isPut()) {
+              val = op.getValue();
+              if (HLogEdit.isDeleted(val)) {
+                throw new IOException("Cannot insert value: " + val);
+              }
+            } else {
+              if (b.getTimestamp() == LATEST_TIMESTAMP) {
+                // Save off these deletes
+                if (deletes == null) {
+                  deletes = new ArrayList<byte []>();
+                }
+                deletes.add(op.getColumn());
+              } else {
+                val = HLogEdit.deleteBytes.get();
+              }
+            }
+            if (val != null) {
+              localput(lid, key, val);
+            }
+          }
+          TreeMap<HStoreKey, byte[]> edits =
+            this.targetColumns.remove(lid);
+          if (edits != null && edits.size() > 0) {
+            update(edits, writeToWAL);
+          }
+          if (deletes != null && deletes.size() > 0) {
+            // We have some LATEST_TIMESTAMP deletes to run.
+            for (byte [] column: deletes) {
+              deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+            }
+          }
+        }
+      } catch (IOException e) {
+        this.targetColumns.remove(Long.valueOf(lid));
+        throw e;
+      } finally {
+        if(lockid == null) releaseRowLock(lid);
+      }
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+    }
+    return success;
+  }
+
   /*
    * Check if resources to support an update.
    * 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue
Jan  6 12:21:17 2009
@@ -1626,6 +1626,26 @@
     return -1;
   }
   
+  public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
+      final HbaseMapWritable<byte[],byte[]> expectedValues)
+  throws IOException {
+    if (b.getRow() == null)
+      throw new IllegalArgumentException("update has null row");
+    checkOpen();
+    this.requestCount.incrementAndGet();
+    HRegion region = getRegion(regionName);
+    validateValuesLength(b, region);
+    try {
+      cacheFlusher.reclaimMemcacheMemory();
+      boolean result = region.checkAndSave(b,
+        expectedValues,getLockFromId(b.getRowLock()), false);
+      return result;
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  }
+  
+  
   /**
    * Utility method to verify values length
    * @param batchUpdate The update to verify

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestHTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestHTable.java?rev=732094&r1=732093&r2=732094&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestHTable.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestHTable.java Tue Jan  6
12:21:17 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -48,6 +49,67 @@
   private static final byte [] attrName = Bytes.toBytes("TESTATTR");
   private static final byte [] attrValue = Bytes.toBytes("somevalue");
 
+  public void testCheckAndSave() throws IOException {
+    HTable table = null;
+    HColumnDescriptor column2 =
+      new HColumnDescriptor(Bytes.toBytes("info2:"));
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    HTableDescriptor testTableADesc =
+      new HTableDescriptor(tableAname);
+    testTableADesc.addFamily(column);
+    testTableADesc.addFamily(column2);
+    admin.createTable(testTableADesc);
+    
+    table = new HTable(conf, tableAname);
+    BatchUpdate batchUpdate = new BatchUpdate(row);
+    BatchUpdate batchUpdate2 = new BatchUpdate(row);
+    BatchUpdate batchUpdate3 = new BatchUpdate(row);
+    
+    HbaseMapWritable<byte[],byte[]> expectedValues =
+      new HbaseMapWritable<byte[],byte[]>();
+    HbaseMapWritable<byte[],byte[]> badExpectedValues =
+      new HbaseMapWritable<byte[],byte[]>();
+    
+    for(int i = 0; i < 5; i++) {
+      // This batchupdate is our initial batch update,
+      // As such we also set our expected values to the same values
+      // since we will be comparing the two
+      batchUpdate.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i));
+      expectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i), Bytes.toBytes(i));
+      
+      badExpectedValues.put(Bytes.toBytes(COLUMN_FAMILY_STR+i),
+        Bytes.toBytes(500));
+      
+      // This is our second batchupdate that we will use to update the initial
+      // batchupdate
+      batchUpdate2.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+1));
+      
+      // This final batch update is to check that our expected values (which
+      // are now wrong)
+      batchUpdate3.put(COLUMN_FAMILY_STR+i, Bytes.toBytes(i+2));
+    }
+    
+    // Initialize rows
+    table.commit(batchUpdate);
+    
+    // check if incorrect values are returned false
+    assertFalse(table.checkAndSave(batchUpdate2,badExpectedValues,null));
+    
+    // make sure first expected values are correct
+    assertTrue(table.checkAndSave(batchUpdate2, expectedValues,null));
+        
+    // make sure check and save truly saves the data after checking the expected
+    // values
+    RowResult r = table.getRow(row);
+    byte[][] columns = batchUpdate2.getColumns();
+    for(int i = 0;i < columns.length;i++) {
+      assertTrue(Bytes.equals(r.get(columns[i]).getValue(),batchUpdate2.get(columns[i])));
+    }
+    
+    // make sure that the old expected values fail
+    assertFalse(table.checkAndSave(batchUpdate3, expectedValues,null));
+  }
+
   /**
    * the test
    * @throws IOException



Mime
View raw message