hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r685391 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ sr...
Date Wed, 13 Aug 2008 00:07:32 GMT
Author: jimk
Date: Tue Aug 12 17:07:29 2008
New Revision: 685391

URL: http://svn.apache.org/viewvc?rev=685391&view=rev
Log:
HBASE-798  Provide Client API to explicitly lock and unlock rows (Jonathan Gray via Jim Kellerman)

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Aug 12 17:07:29 2008
@@ -32,6 +32,8 @@
 
   NEW FEATURES
    HBASE-787  Postgresql to HBase table replication example (Tim Sell via Stack)
+   HBASE-798  Provide Client API to explicitly lock and unlock rows (Jonathan
+              Gray via Jim Kellerman)
 
   OPTIMIZATIONS
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java Tue Aug 12 17:07:29 2008
@@ -373,7 +373,7 @@
         b.delete(COL_STARTCODE);
         b.delete(COL_SPLITA);
         b.delete(COL_SPLITB);
-        root.batchUpdate(b);
+        root.batchUpdate(b,null);
 
         if(LOG.isDebugEnabled()) {
           LOG.debug("updated columns in row: " + regionsToDelete[r]);
@@ -383,7 +383,7 @@
       newInfo.setOffline(true);
       BatchUpdate b = new BatchUpdate(newRegion.getRegionName());
       b.put(COL_REGIONINFO, Writables.getBytes(newInfo));
-      root.batchUpdate(b);
+      root.batchUpdate(b,null);
       if(LOG.isDebugEnabled()) {
         LOG.debug("updated columns in row: " + newRegion.getRegionName());
       }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Tue Aug 12 17:07:29
2008
@@ -640,11 +640,32 @@
   public RowResult getRow(final byte [] row, final byte [][] columns, 
     final long ts) 
   throws IOException {       
+    return getRow(row,columns,ts,null);
+  }
+
+  /** 
+   * Get selected columns for the specified row at a specified timestamp
+   * using existing row lock.
+   * 
+   * @param row row key
+   * @param columns Array of column names and families you want to retrieve.
+   * @param ts timestamp
+   * @param rl row lock
+   * @return RowResult is empty if row does not exist.
+   * @throws IOException
+   */
+  public RowResult getRow(final byte [] row, final byte [][] columns, 
+    final long ts, final RowLock rl) 
+  throws IOException {       
     return connection.getRegionServerWithRetries(
         new ServerCallable<RowResult>(connection, tableName, row) {
           public RowResult call() throws IOException {
+            long lockId = -1L;
+            if(rl != null) {
+              lockId = rl.getLockId();
+            }
             return server.getRow(location.getRegionInfo().getRegionName(), row, 
-                columns, ts);
+                columns, ts, lockId);
           }
         }
     );
@@ -1104,15 +1125,35 @@
    */
   public void deleteAll(final byte [] row, final byte [] column, final long ts)
   throws IOException {
+    deleteAll(row,column,ts,null);
+  }
+
+  /** 
+   * Delete all cells that match the passed row and column and whose
+   * timestamp is equal-to or older than the passed timestamp, using an
+   * existing row lock.
+   * @param row Row to update
+   * @param column name of column whose value is to be deleted
+   * @param ts Delete all cells of the same timestamp or older.
+   * @param rl Existing row lock
+   * @throws IOException 
+   */
+  public void deleteAll(final byte [] row, final byte [] column, final long ts,
+      final RowLock rl)
+  throws IOException {
     connection.getRegionServerWithRetries(
         new ServerCallable<Boolean>(connection, tableName, row) {
           public Boolean call() throws IOException {
+            long lockId = -1L;
+            if(rl != null) {
+              lockId = rl.getLockId();
+            }
             if (column != null) {
               this.server.deleteAll(location.getRegionInfo().getRegionName(),
-                row, column, ts);
+                row, column, ts, lockId);
             } else {
               this.server.deleteAll(location.getRegionInfo().getRegionName(),
-                  row, ts);
+                  row, ts, lockId);
             }
             return null;
           }
@@ -1161,11 +1202,31 @@
   public void deleteFamily(final byte [] row, final byte [] family, 
     final long timestamp)
   throws IOException {
+    deleteFamily(row,family,timestamp,null);
+  }
+
+  /**
+   * Delete all cells for a row with matching column family with timestamps
+   * less than or equal to <i>timestamp</i>, using existing row lock.
+   *
+   * @param row The row to operate on
+   * @param family The column family to match
+   * @param timestamp Timestamp to match
+   * @param rl Existing row lock
+   * @throws IOException
+   */
+  public void deleteFamily(final byte [] row, final byte [] family, 
+    final long timestamp, final RowLock rl)
+  throws IOException {
     connection.getRegionServerWithRetries(
         new ServerCallable<Boolean>(connection, tableName, row) {
           public Boolean call() throws IOException {
+            long lockId = -1L;
+            if(rl != null) {
+              lockId = rl.getLockId();
+            }
             server.deleteFamily(location.getRegionInfo().getRegionName(), row, 
-                family, timestamp);
+                family, timestamp, lockId);
             return null;
           }
         }
@@ -1179,11 +1240,27 @@
    */ 
   public synchronized void commit(final BatchUpdate batchUpdate) 
   throws IOException {
+    commit(batchUpdate,null);
+  }
+  
+  /**
+   * Commit a BatchUpdate to the table using existing row lock.
+   * @param batchUpdate
+   * @param rl Existing row lock
+   * @throws IOException
+   */ 
+  public synchronized void commit(final BatchUpdate batchUpdate,
+      final RowLock rl) 
+  throws IOException {
     connection.getRegionServerWithRetries(
       new ServerCallable<Boolean>(connection, tableName, batchUpdate.getRow()) {
         public Boolean call() throws IOException {
+          long lockId = -1L;
+          if(rl != null) {
+            lockId = rl.getLockId();
+          }
           server.batchUpdate(location.getRegionInfo().getRegionName(), 
-            batchUpdate);
+            batchUpdate, lockId);
           return null;
         }
       }
@@ -1198,7 +1275,45 @@
   public synchronized void commit(final List<BatchUpdate> batchUpdates) 
   throws IOException {
     for (BatchUpdate batchUpdate : batchUpdates) 
-      commit(batchUpdate);
+      commit(batchUpdate,null);
+  }
+
+  /**
+   * Obtain a row lock
+   * @param row The row to lock
+   * @return rowLock RowLock containing row and lock id
+   * @throws IOException
+   */
+  public RowLock lockRow(final byte [] row)
+  throws IOException {
+    return connection.getRegionServerWithRetries(
+      new ServerCallable<RowLock>(connection, tableName, row) {
+        public RowLock call() throws IOException {
+          long lockId =
+              server.lockRow(location.getRegionInfo().getRegionName(), row);
+          RowLock rowLock = new RowLock(row,lockId);
+          return rowLock;
+        }
+      }
+    );
+  }
+
+  /**
+   * Release a row lock
+   * @param rl The row lock to release
+   * @throws IOException
+   */
+  public void unlockRow(final RowLock rl)
+  throws IOException {
+    connection.getRegionServerWithRetries(
+      new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
+        public Boolean call() throws IOException {
+          server.unlockRow(location.getRegionInfo().getRegionName(),
+              rl.getLockId());
+          return null;
+        }
+      }
+    );
   }
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Aug
12 17:07:29 2008
@@ -111,11 +111,12 @@
    * 
    * @param regionName region name
    * @param row row key
+   * @param lockId lock id
    * @return map of values
    * @throws IOException
    */
   public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte[][] columns, final long ts)
+    final byte[][] columns, final long ts, final long lockId)
   throws IOException;
 
   /**
@@ -123,9 +124,11 @@
    * 
    * @param regionName name of the region to update
    * @param b BatchUpdate
+   * @param lockId lock id
    * @throws IOException
    */
-  public void batchUpdate(final byte [] regionName, final BatchUpdate b)
+  public void batchUpdate(final byte [] regionName, final BatchUpdate b,
+      final long lockId)
   throws IOException;
   
   /**
@@ -136,10 +139,11 @@
    * @param row row key
    * @param column column key
    * @param timestamp Delete all entries that have this timestamp or older
+   * @param lockId lock id
    * @throws IOException
    */
   public void deleteAll(byte [] regionName, byte [] row, byte [] column,
-    long timestamp)
+    long timestamp, long lockId)
   throws IOException;
 
   /**
@@ -149,9 +153,11 @@
    * @param regionName region name
    * @param row row key
    * @param timestamp Delete all entries that have this timestamp or older
+   * @param lockId lock id
    * @throws IOException
    */
-  public void deleteAll(byte [] regionName, byte [] row, long timestamp)
+  public void deleteAll(byte [] regionName, byte [] row, long timestamp,
+      long lockId)
   throws IOException;
 
   /**
@@ -162,9 +168,10 @@
    * @param row The row to operate on
    * @param family The column family to match
    * @param timestamp Timestamp to match
+   * @param lockId lock id
    */
   public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
-    long timestamp)
+    long timestamp, long lockId)
   throws IOException;
 
   
@@ -207,4 +214,24 @@
    * @throws IOException
    */
   public void close(long scannerId) throws IOException;
+  
+  /**
+   * Opens a remote row lock.
+   *
+   * @param regionName name of region
+   * @param row row to lock
+   * @return lockId lock identifier
+   * @throws IOException
+   */
+  public long lockRow(final byte [] regionName, final byte [] row)
+  throws IOException;
+
+  /**
+   * Releases a remote row lock.
+   *
+   * @param lockId the lock id returned by lockRow
+   * @throws IOException
+   */
+  public void unlockRow(final byte [] regionName, final long lockId)
+  throws IOException;
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Tue Aug 12
17:07:29 2008
@@ -332,7 +332,7 @@
     
     BatchUpdate b = new BatchUpdate(parent);
     b.delete(splitColumn);
-    srvr.batchUpdate(metaRegionName, b);
+    srvr.batchUpdate(metaRegionName, b, -1L);
       
     return result;
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java Tue Aug
12 17:07:29 2008
@@ -91,7 +91,7 @@
       updateRegionInfo(b, i);
       b.delete(COL_SERVER);
       b.delete(COL_STARTCODE);
-      server.batchUpdate(m.getRegionName(), b);
+      server.batchUpdate(m.getRegionName(), b, -1L);
       if (LOG.isDebugEnabled()) {
         LOG.debug("updated columns in row: " + i.getRegionNameAsString());
       }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ColumnOperation.java Tue Aug
12 17:07:29 2008
@@ -52,7 +52,7 @@
   throws IOException {
     BatchUpdate b = new BatchUpdate(i.getRegionName());
     b.put(COL_REGIONINFO, Writables.getBytes(i));
-    server.batchUpdate(regionName, b);
+    server.batchUpdate(regionName, b, -1L);
     if (LOG.isDebugEnabled()) {
       LOG.debug("updated columns in row: " + i.getRegionNameAsString());
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java Tue Aug
12 17:07:29 2008
@@ -52,7 +52,7 @@
   throws IOException {
     BatchUpdate b = new BatchUpdate(i.getRegionName());
     b.put(COL_REGIONINFO, Writables.getBytes(i));
-    server.batchUpdate(regionName, b);
+    server.batchUpdate(regionName, b, -1L);
     LOG.debug("updated HTableDescriptor for region " + i.getRegionNameAsString());
   }
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Tue
Aug 12 17:07:29 2008
@@ -83,7 +83,7 @@
           BatchUpdate b = new BatchUpdate(regionInfo.getRegionName());
           b.put(COL_SERVER, Bytes.toBytes(serverAddress.toString()));
           b.put(COL_STARTCODE, startCode);
-          server.batchUpdate(metaRegionName, b);
+          server.batchUpdate(metaRegionName, b, -1L);
           if (!this.historian.isOnline()) {
             // This is safest place to do the onlining of the historian in
             // the master.  When we get to here, we know there is a .META.

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Aug
12 17:07:29 2008
@@ -560,7 +560,7 @@
     byte [] regionName = region.getRegionName();
     BatchUpdate b = new BatchUpdate(regionName);
     b.put(COL_REGIONINFO, Writables.getBytes(info));
-    server.batchUpdate(metaRegionName, b);
+    server.batchUpdate(metaRegionName, b, -1L);
     
     // 4. Close the new region to flush it to disk.  Close its log file too.
     region.close();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Aug
12 17:07:29 2008
@@ -1,4 +1,4 @@
-/**
+  /**
  * Copyright 2007 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -1170,7 +1170,7 @@
    * @throws IOException
    */
   public Map<byte [], Cell> getFull(final byte [] row,
-      final Set<byte []> columns, final long ts) 
+      final Set<byte []> columns, final long ts, final Integer lockid) 
   throws IOException {
     // Check columns passed
     if (columns != null) {
@@ -1179,7 +1179,7 @@
       }
     }
     HStoreKey key = new HStoreKey(row, ts);
-    Integer lid = obtainRowLock(row);
+    Integer lid = getLock(lockid,row);
     HashSet<HStore> storeSet = new HashSet<HStore>();
     try {
       TreeMap<byte [], Cell> result =
@@ -1215,7 +1215,7 @@
       
       return result;
     } finally {
-      releaseRowLock(lid);
+      if(lockid == null) releaseRowLock(lid);
     }
   }
 
@@ -1347,7 +1347,7 @@
    * @param b
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b)
+  public void batchUpdate(BatchUpdate b, Integer lockid)
   throws IOException {
     checkReadOnly();
 
@@ -1363,7 +1363,8 @@
     // See HRegionServer#RegionListener for how the expire on HRegionServer
     // invokes a HRegion#abort.
     byte [] row = b.getRow();
-    Integer lid = obtainRowLock(row);
+    // If we did not pass an existing row lock, obtain a new one
+    Integer lid = getLock(lockid,row);
     long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
       System.currentTimeMillis() : b.getTimestamp();
     try {
@@ -1408,7 +1409,7 @@
       this.targetColumns.remove(Long.valueOf(lid));
       throw e;
     } finally {
-      releaseRowLock(lid);
+      if(lockid == null) releaseRowLock(lid);
     }
   }
 
@@ -1458,17 +1459,19 @@
    * @param row
    * @param column
    * @param ts Delete all entries that have this timestamp or older
+   * @param lockid Row lock
    * @throws IOException
    */
-  public void deleteAll(final byte [] row, final byte [] column, final long ts)
+  public void deleteAll(final byte [] row, final byte [] column, final long ts,
+      final Integer lockid)
   throws IOException {
     checkColumn(column);
     checkReadOnly();
-    Integer lid = obtainRowLock(row);
+    Integer lid = getLock(lockid,row);
     try {
       deleteMultiple(row, column, ts, ALL_VERSIONS);
     } finally {
-      releaseRowLock(lid);
+      if(lockid == null) releaseRowLock(lid);
     }
   }
 
@@ -1476,12 +1479,14 @@
    * Delete all cells of the same age as the passed timestamp or older.
    * @param row
    * @param ts Delete all entries that have this timestamp or older
+   * @param lockid Row lock
    * @throws IOException
    */
-  public void deleteAll(final byte [] row, final long ts)
+  public void deleteAll(final byte [] row, final long ts,
+      final Integer lockid)
   throws IOException {
     checkReadOnly();
-    Integer lid = obtainRowLock(row);    
+    Integer lid = getLock(lockid,row);
     try {
       for (HStore store : stores.values()){
         List<HStoreKey> keys = store.getKeys(new HStoreKey(row, ts),
@@ -1493,7 +1498,7 @@
         update(edits);
       }
     } finally {
-      releaseRowLock(lid);
+      if(lockid == null) releaseRowLock(lid);
     }
   }
 
@@ -1504,12 +1509,14 @@
    * @param row The row to operate on
    * @param family The column family to match
    * @param timestamp Timestamp to match
+   * @param lockid Row lock
    * @throws IOException
    */
-  public void deleteFamily(byte [] row, byte [] family, long timestamp)
+  public void deleteFamily(byte [] row, byte [] family, long timestamp,
+      final Integer lockid)
   throws IOException{
     checkReadOnly();
-    Integer lid = obtainRowLock(row);    
+    Integer lid = getLock(lockid,row);
     try {
       // find the HStore for the column family
       HStore store = getStore(family);
@@ -1522,7 +1529,7 @@
       }
       update(edits);
     } finally {
-      releaseRowLock(lid);
+      if(lockid == null) releaseRowLock(lid);
     }
   }
   
@@ -1552,7 +1559,7 @@
       update(edits);
     }
   }
-  
+    
   /**
    * @throws IOException Throws exception if region is in read-only mode.
    */
@@ -1778,6 +1785,41 @@
     }
   }
   
+  /**
+   * See if row is currently locked.
+   * @param lockid
+   * @return boolean
+   */
+  private boolean isRowLocked(final Integer lockid) {
+    synchronized (locksToRows) {
+      if(locksToRows.containsKey(lockid)) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+  
+  /**
+   * Returns existing row lock if found, otherwise
+   * obtains a new row lock and returns it.
+   * @param lockid
+   * @return lockid
+   */
+  private Integer getLock(Integer lockid, byte [] row) 
+  throws IOException {
+    Integer lid = null;
+    if(lockid == null) {
+      lid = obtainRowLock(row);
+    } else {
+      if(!isRowLocked(lockid)) {
+        throw new IOException("Invalid row lock");
+      }
+      lid = lockid;
+    }
+    return lid;
+  }
+  
   private void waitOnRowLocks() {
     synchronized (locksToRows) {
       while (this.locksToRows.size() > 0) {
@@ -2134,7 +2176,8 @@
   public static void removeRegionFromMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final byte [] regionName)
   throws IOException {
-    srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
+    srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP,
+        (long)-1L);
   }
 
   /**
@@ -2155,7 +2198,7 @@
     b.delete(COL_STARTCODE);
     // If carrying splits, they'll be in place when we show up on new
     // server.
-    srvr.batchUpdate(metaRegionName, b);
+    srvr.batchUpdate(metaRegionName, b, (long)-1L);
   }
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue
Aug 12 17:07:29 2008
@@ -69,6 +69,7 @@
 import org.apache.hadoop.hbase.RegionHistorian;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchOperation;
@@ -1048,7 +1049,7 @@
 
   /** {@inheritDoc} */
   public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte [][] columns, final long ts)
+    final byte [][] columns, final long ts, final long lockId)
   throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
@@ -1061,7 +1062,8 @@
       }
       
       HRegion region = getRegion(regionName);
-      Map<byte [], Cell> map = region.getFull(row, columnSet, ts);
+      Map<byte [], Cell> map = region.getFull(row, columnSet, ts,
+          getLockFromId(lockId));
       HbaseMapWritable<byte [], Cell> result =
         new HbaseMapWritable<byte [], Cell>();
       result.putAll(map);
@@ -1126,7 +1128,7 @@
   }
 
   /** {@inheritDoc} */
-  public void batchUpdate(final byte [] regionName, BatchUpdate b)
+  public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
   throws IOException {
     checkOpen();
     this.requestCount.incrementAndGet();
@@ -1134,7 +1136,7 @@
     validateValuesLength(b, region);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      region.batchUpdate(b);
+      region.batchUpdate(b, getLockFromId(lockId));
     } catch (OutOfMemoryError error) {
       abort();
       LOG.fatal("Ran out of memory", error);
@@ -1239,7 +1241,7 @@
   }
 
   Map<String, InternalScanner> scanners =
-    Collections.synchronizedMap(new HashMap<String, InternalScanner>());
+    new ConcurrentHashMap<String, InternalScanner>();
 
   /** 
    * Instantiated as a scanner lease.
@@ -1275,26 +1277,157 @@
   
   /** {@inheritDoc} */
   public void deleteAll(final byte [] regionName, final byte [] row,
-      final byte [] column, final long timestamp) 
+      final byte [] column, final long timestamp, final long lockId) 
   throws IOException {
     HRegion region = getRegion(regionName);
-    region.deleteAll(row, column, timestamp);
+    region.deleteAll(row, column, timestamp, getLockFromId(lockId));
   }
 
   /** {@inheritDoc} */
   public void deleteAll(final byte [] regionName, final byte [] row,
-      final long timestamp) 
+      final long timestamp, final long lockId) 
   throws IOException {
     HRegion region = getRegion(regionName);
-    region.deleteAll(row, timestamp);
+    region.deleteAll(row, timestamp, getLockFromId(lockId));
   }
 
   /** {@inheritDoc} */
   public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
-    long timestamp) throws IOException{
-    getRegion(regionName).deleteFamily(row, family, timestamp);
+    long timestamp, final long lockId)
+  throws IOException{
+    getRegion(regionName).deleteFamily(row, family, timestamp,
+        getLockFromId(lockId));
   }
 
+  /** {@inheritDoc} */
+  public long lockRow(byte [] regionName, byte [] row)
+  throws IOException {
+    checkOpen();
+    NullPointerException npe = null;
+    if(regionName == null) {
+      npe = new NullPointerException("regionName is null");
+    } else if(row == null) {
+      npe = new NullPointerException("row to lock is null");
+    }
+    if(npe != null) {
+      IOException io = new IOException("Invalid arguments to lockRow");
+      io.initCause(npe);
+      throw io;
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      Integer r = region.obtainRowLock(row);
+      long lockId = addRowLock(r,region);
+      LOG.debug("Row lock " + lockId + " explicitly acquired by client");
+      return lockId;
+    } catch (IOException e) {
+      LOG.error("Error obtaining row lock (fsOk: " + this.fsOk + ")",
+          RemoteExceptionHandler.checkIOException(e));
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException {
+    long lockId = -1L;
+    lockId = rand.nextLong();
+    String lockName = String.valueOf(lockId);
+    synchronized(rowlocks) {
+      rowlocks.put(lockName, r);
+    }
+    this.leases.
+      createLease(lockName, new RowLockListener(lockName, region));
+    return lockId;
+  }
+
+  /**
+   * Method to get the Integer lock identifier used internally
+   * from the long lock identifier used by the client.
+   * @param lockId long row lock identifier from client
+   * @return intId Integer row lock used internally in HRegion
+   * @throws IOException Thrown if this is not a valid client lock id.
+   */
+  private Integer getLockFromId(long lockId)
+  throws IOException {
+    if(lockId == -1L) {
+      return null;
+    }
+    String lockName = String.valueOf(lockId);
+    Integer rl = null;
+    synchronized(rowlocks) {
+      rl = rowlocks.get(lockName);
+    }
+    if(rl == null) {
+      throw new IOException("Invalid row lock");
+    }
+    this.leases.renewLease(lockName);
+    return rl;
+  }
+
+  /** {@inheritDoc} */
+  public void unlockRow(byte [] regionName, long lockId)
+  throws IOException {
+    checkOpen();
+    NullPointerException npe = null;
+    if(regionName == null) {
+      npe = new NullPointerException("regionName is null");
+    } else if(lockId == -1L) {
+      npe = new NullPointerException("lockId is null");
+    }
+    if(npe != null) {
+      IOException io = new IOException("Invalid arguments to unlockRow");
+      io.initCause(npe);
+      throw io;
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      String lockName = String.valueOf(lockId);
+      Integer r = null;
+      synchronized(rowlocks) {
+        r = rowlocks.remove(lockName);
+      }
+      if(r == null) {
+        throw new UnknownRowLockException(lockName);
+      }
+      region.releaseRowLock(r);
+      this.leases.cancelLease(lockName);
+      LOG.debug("Row lock " + lockId + " has been explicitly released by client");
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  Map<String, Integer> rowlocks =
+    new ConcurrentHashMap<String, Integer>();
+
+  /**
+   * Instantiated as a row lock lease.
+   * If the lease times out, the row lock is released
+   */
+  private class RowLockListener implements LeaseListener {
+    private final String lockName;
+    private final HRegion region;
+
+    RowLockListener(final String lockName, final HRegion region) {
+      this.lockName = lockName;
+      this.region = region;
+    }
+
+    /** {@inheritDoc} */
+    public void leaseExpired() {
+      LOG.info("Row Lock " + this.lockName + " lease expired");
+      Integer r = null;
+      synchronized(rowlocks) {
+        r = rowlocks.remove(this.lockName);
+      }
+      if(r != null) {
+        region.releaseRowLock(r);
+      }
+    }
+  }
 
   /**
    * @return Info on this server.

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Merge.java Tue Aug 12 17:07:29
2008
@@ -308,7 +308,7 @@
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing region: " + regioninfo + " from " + meta);
     }
-    meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis());
+    meta.deleteAll(regioninfo.getRegionName(), System.currentTimeMillis(), null);
   }
 
   /*

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java Tue Aug 12 17:07:29
2008
@@ -407,7 +407,7 @@
     }
     BatchUpdate b = new BatchUpdate(hri.getRegionName());
     b.put(HConstants.COL_REGIONINFO, Writables.getBytes(hri));
-    r.batchUpdate(b);
+    r.batchUpdate(b, null);
     if (LOG.isDebugEnabled()) {
       HRegionInfo h = Writables.getHRegionInfoOrNull(
           r.get(hri.getRegionName(), HConstants.COL_REGIONINFO).getValue());

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java?rev=685391&r1=685390&r2=685391&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/migration/v5/HRegion.java Tue
Aug 12 17:07:29 2008
@@ -2050,7 +2050,8 @@
   public static void removeRegionFromMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final byte [] regionName)
   throws IOException {
-    srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
+    srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP, 
+        -1L);
   }
 
   /**
@@ -2071,7 +2072,7 @@
     b.delete(COL_STARTCODE);
     // If carrying splits, they'll be in place when we show up on new
     // server.
-    srvr.batchUpdate(metaRegionName, b);
+    srvr.batchUpdate(metaRegionName, b, -1L);
   }
 
   /**



Mime
View raw message