hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r556754 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/
Date Mon, 16 Jul 2007 22:20:02 GMT
Author: jimk
Date: Mon Jul 16 15:19:59 2007
New Revision: 556754

URL: http://svn.apache.org/viewvc?view=rev&rev=556754
Log:
HADOOP-1468 Add HBase batch update to reduce RPC overhead

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Mon Jul 16 15:19:59 2007
@@ -63,3 +63,5 @@
  39. HADOOP-1581 Un-openable tablename bug
  40. HADOOP-1607 [shell] Clear screen command (Edward Yoon via Stack)
  41. HADOOP-1614 [hbase] HClient does not protect itself from simultaneous updates
+ 42. HADOOP-1468 Add HBase batch update to reduce RPC overhead
+

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java Mon
Jul 16 15:19:59 2007
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Random;
@@ -33,6 +34,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
@@ -62,10 +64,102 @@
   private long currentLockId;
   private Class<? extends HRegionInterface> serverInterfaceClass;
   
+  protected class BatchHandler {
+    private HashMap<RegionLocation, BatchUpdate> regionToBatch;
+    private HashMap<Long, BatchUpdate> lockToBatch;
+    
+    /** constructor */
+    public BatchHandler() {
+      this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
+      this.lockToBatch = new HashMap<Long, BatchUpdate>();
+    }
+    
+    /** 
+     * Start a batch row insertion/update.
+     * 
+     * Manages multiple batch updates that are targeted for multiple servers,
+     * should the rows span several region servers.
+     * 
+     * No changes are committed until the client commits the batch operation via
+     * HClient.batchCommit().
+     * 
+     * The entire batch update can be abandoned by calling HClient.batchAbort();
+     *
+     * Callers to this method are given a handle that corresponds to the row being
+     * changed. The handle must be supplied on subsequent put or delete calls so
+     * that the row can be identified.
+     * 
+     * @param row Name of row to start update against.
+     * @return Row lockid.
+     */
+    public synchronized long startUpdate(Text row) {
+      RegionLocation info = getRegionLocation(row);
+      BatchUpdate batch = regionToBatch.get(info);
+      if(batch == null) {
+        batch = new BatchUpdate();
+        regionToBatch.put(info, batch);
+      }
+      long lockid = batch.startUpdate(row);
+      lockToBatch.put(lockid, batch);
+      return lockid;
+    }
+    
+    /**
+     * Change the value for the specified column
+     * 
+     * @param lockid lock id returned from startUpdate
+     * @param column column whose value is being set
+     * @param value new value for column
+     */
+    public synchronized void put(long lockid, Text column, byte[] value) {
+      BatchUpdate batch = lockToBatch.get(lockid);
+      if (batch == null) {
+        throw new IllegalArgumentException("invalid lock id " + lockid);
+      }
+      batch.put(lockid, column, value);
+    }
+    
+    /** 
+     * Delete the value for a column
+     *
+     * @param lockid              - lock id returned from startUpdate
+     * @param column              - name of column whose value is to be deleted
+     */
+    public synchronized void delete(long lockid, Text column) {
+      BatchUpdate batch = lockToBatch.get(lockid);
+      if (batch == null) {
+        throw new IllegalArgumentException("invalid lock id " + lockid);
+      }
+      batch.delete(lockid, column);
+    }
+    
+    /** 
+     * Finalize a batch mutation
+     *
+     * @param timestamp time to associate with all the changes
+     * @throws IOException
+     */
+    public synchronized void commit(long timestamp) throws IOException {
+      try {
+        for(Map.Entry<RegionLocation, BatchUpdate> e: regionToBatch.entrySet()) {
+          RegionLocation r = e.getKey();
+          HRegionInterface server = getHRegionConnection(r.serverAddress);
+          server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
+              e.getValue());
+        }
+      } catch (RemoteException e) {
+        throw RemoteExceptionHandler.decodeRemoteException(e);
+      }
+    }
+  }
+
+  private BatchHandler batch;
+  
   /*
    * Data structure that holds current location for a region and its info.
    */
-  protected static class RegionLocation {
+  @SuppressWarnings("unchecked")
+  protected static class RegionLocation implements Comparable {
     HRegionInfo regionInfo;
     HServerAddress serverAddress;
 
@@ -84,18 +178,48 @@
     }
     
     /**
-     * @return HRegionInfo
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean equals(Object o) {
+      return this.compareTo(o) == 0;
+    }
+    
+    /**
+     * {@inheritDoc}
      */
+    @Override
+    public int hashCode() {
+      int result = this.regionInfo.hashCode();
+      result ^= this.serverAddress.hashCode();
+      return result;
+    }
+    
+    /** @return HRegionInfo */
     public HRegionInfo getRegionInfo(){
       return regionInfo;
     }
 
-    /**
-     * @return HServerAddress
-     */
+    /** @return HServerAddress */
     public HServerAddress getServerAddress(){
       return serverAddress;
     }
+
+    //
+    // Comparable
+    //
+    
+    /**
+     * {@inheritDoc}
+     */
+    public int compareTo(Object o) {
+      RegionLocation other = (RegionLocation) o;
+      int result = this.regionInfo.compareTo(other.regionInfo);
+      if(result == 0) {
+        result = this.serverAddress.compareTo(other.serverAddress);
+      }
+      return result;
+    }
   }
   
   // Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
@@ -121,6 +245,7 @@
    */
   public HClient(Configuration conf) {
     this.conf = conf;
+    this.batch = null;
     this.currentLockId = -1;
 
     this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
@@ -222,8 +347,6 @@
    * 
    * @param desc table descriptor for table
    * 
-   * @throws RemoteException if exception occurred on remote side of
-   * connection.
    * @throws IllegalArgumentException if the table name is reserved
    * @throws MasterNotRunningException if master is not running
    * @throws NoServerForRegionException if root region is not being served
@@ -254,8 +377,6 @@
    * 
    * @param desc table descriptor for table
    * 
-   * @throws RemoteException if exception occurred on remote side of
-   * connection.
    * @throws IllegalArgumentException if the table name is reserved
    * @throws MasterNotRunningException if master is not running
    * @throws NoServerForRegionException if root region is not being served
@@ -265,7 +386,7 @@
    * @throws IOException
    */
   public synchronized void createTableAsync(HTableDescriptor desc)
-  throws IOException {
+      throws IOException {
     checkReservedTableName(desc.getName());
     checkMaster();
     try {
@@ -610,7 +731,7 @@
     if(tableName == null || tableName.getLength() == 0) {
       throw new IllegalArgumentException("table name cannot be null or zero length");
     }
-    if(this.currentLockId != -1) {
+    if(this.currentLockId != -1 || batch != null) {
       throw new IllegalStateException("update in progress");
     }
     this.tableServers = getTableServers(tableName);
@@ -1316,6 +1437,59 @@
   }
 
   /** 
+   * Start a batch of row insertions/updates.
+   * 
+   * No changes are committed until the call to commitBatchUpdate returns.
+   * A call to abortBatchUpdate will abandon the entire batch.
+   * 
+   * Note that in batch mode, calls to commit or abort are ignored.
+   */
+  public synchronized void startBatchUpdate() {
+    if(this.tableServers == null) {
+      throw new IllegalStateException("Must open table first");
+    }
+    
+    if(batch == null) {
+      batch = new BatchHandler();
+    }
+  }
+  
+  /** 
+   * Abort a batch mutation
+   */
+  public synchronized void abortBatch() {
+    batch = null;
+  }
+  
+  /** 
+   * Finalize a batch mutation
+   *
+   * @throws IOException
+   */
+  public synchronized void commitBatch() throws IOException {
+    commitBatch(System.currentTimeMillis());
+  }
+
+  /** 
+   * Finalize a batch mutation
+   *
+   * @param timestamp time to associate with all the changes
+   * @throws IOException
+   */
+  public synchronized void commitBatch(long timestamp) throws IOException {
+    if(batch == null) {
+      throw new IllegalStateException("no batch update in progress");
+    }
+    
+    try {
+      batch.commit(timestamp);
+      
+    } finally {
+      batch = null;
+    }
+  }
+  
+  /** 
    * Start an atomic row insertion/update.  No changes are committed until the 
    * call to commit() returns. A call to abort() will abandon any updates in progress.
    *
@@ -1334,6 +1508,9 @@
     if(this.currentLockId != -1) {
       throw new IllegalStateException("update in progress");
     }
+    if(batch != null) {
+      return batch.startUpdate(row);
+    }
     for(int tries = 0; tries < numRetries; tries++) {
       IOException e = null;
       RegionLocation info = getRegionLocation(row);
@@ -1379,6 +1556,14 @@
    * @throws IOException
    */
   public void put(long lockid, Text column, byte val[]) throws IOException {
+    if(val == null) {
+      throw new IllegalArgumentException("value cannot be null");
+    }
+    if(batch != null) {
+      batch.put(lockid, column, val);
+      return;
+    }
+    
     if(lockid != this.currentLockId) {
       throw new IllegalArgumentException("invalid lockid");
     }
@@ -1408,6 +1593,11 @@
    * @throws IOException
    */
   public void delete(long lockid, Text column) throws IOException {
+    if(batch != null) {
+      batch.delete(lockid, column);
+      return;
+    }
+    
     if(lockid != this.currentLockId) {
       throw new IllegalArgumentException("invalid lockid");
     }
@@ -1436,6 +1626,10 @@
    * @throws IOException
    */
   public void abort(long lockid) throws IOException {
+    if(batch != null) {
+      return;
+    }
+    
     if(lockid != this.currentLockId) {
       throw new IllegalArgumentException("invalid lockid");
     }
@@ -1471,6 +1665,10 @@
    * @throws IOException
    */
   public void commit(long lockid, long timestamp) throws IOException {
+    if(batch != null) {
+      return;
+    }
+    
     if(lockid != this.currentLockId) {
       throw new IllegalArgumentException("invalid lockid");
     }
@@ -1497,6 +1695,13 @@
    * @throws IOException
    */
   public void renewLease(long lockid) throws IOException {
+    if(batch != null) {
+      return;
+    }
+    
+    if(lockid != this.currentLockId) {
+      throw new IllegalArgumentException("invalid lockid");
+    }
     try {
       this.currentServer.renewLease(lockid, this.clientid);
     } catch(IOException e) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
Mon Jul 16 15:19:59 2007
@@ -34,13 +34,39 @@
  * HRegions' table descriptor, etc.
  */
 public class HRegionInfo implements WritableComparable {
+  /** delimiter used between portions of a region name */
+  public static final char DELIMITER = ',';
+  
+  /**
+   * Extracts table name prefix from a region name.
+   * Presumes region names are ASCII characters only.
+   * @param regionName A region name.
+   * @return The table prefix of a region name.
+   */
+  public static Text getTableNameFromRegionName(final Text regionName) {
+    int index = -1;
+    byte [] bytes = regionName.getBytes();
+    for (int i = 0; i < bytes.length; i++) {
+      if (((char) bytes[i]) == DELIMITER) {
+        index = i;
+        break;
+      }
+    }
+    if (index == -1) {
+      throw new IllegalArgumentException(regionName.toString() + " does not " +
+        "contain " + DELIMITER + " character");
+    }
+    byte [] tableName = new byte[index];
+    System.arraycopy(bytes, 0, tableName, 0, index);
+    return new Text(tableName);
+  }
+
   Text regionName;
   long regionId;
   Text startKey;
   Text endKey;
   boolean offLine;
   HTableDescriptor tableDesc;
-  public static final char DELIMITER = ',';
   
   /** Default constructor - creates empty object */
   public HRegionInfo() {
@@ -100,6 +126,34 @@
     this.offLine = false;
   }
   
+  /** @return the endKey */
+  public Text getEndKey(){
+    return endKey;
+  }
+
+  /** @return the regionId */
+  public long getRegionId(){
+    return regionId;
+  }
+
+  /** @return the regionName */
+  public Text getRegionName(){
+    return regionName;
+  }
+
+  /** @return the startKey */
+  public Text getStartKey(){
+    return startKey;
+  }
+
+  /** @return the tableDesc */
+  public HTableDescriptor getTableDesc(){
+    return tableDesc;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public String toString() {
     return "regionname: " + this.regionName.toString() + ", startKey: <" +
@@ -107,11 +161,17 @@
       this.tableDesc.toString() + "}";
   }
     
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public boolean equals(Object o) {
     return this.compareTo(o) == 0;
   }
   
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public int hashCode() {
     int result = this.regionName.hashCode();
@@ -123,10 +183,13 @@
     return result;
   }
 
-  //////////////////////////////////////////////////////////////////////////////
+  //
   // Writable
-  //////////////////////////////////////////////////////////////////////////////
+  //
 
+  /**
+   * {@inheritDoc}
+   */
   public void write(DataOutput out) throws IOException {
     out.writeLong(regionId);
     tableDesc.write(out);
@@ -136,6 +199,9 @@
     out.writeBoolean(offLine);
   }
   
+  /**
+   * {@inheritDoc}
+   */
   public void readFields(DataInput in) throws IOException {
     this.regionId = in.readLong();
     this.tableDesc.readFields(in);
@@ -145,69 +211,13 @@
     this.offLine = in.readBoolean();
   }
   
-  /**
-   * @return the endKey
-   */
-  public Text getEndKey(){
-    return endKey;
-  }
-
-  /**
-   * @return the regionId
-   */
-  public long getRegionId(){
-    return regionId;
-  }
-
-  /**
-   * @return the regionName
-   */
-  public Text getRegionName(){
-    return regionName;
-  }
+  //
+  // Comparable
+  //
   
   /**
-   * Extracts table name prefix from a region name.
-   * Presumes region names are ASCII characters only.
-   * @param regionName A region name.
-   * @return The table prefix of a region name.
+   * {@inheritDoc}
    */
-  public static Text getTableNameFromRegionName(final Text regionName) {
-    int index = -1;
-    byte [] bytes = regionName.getBytes();
-    for (int i = 0; i < bytes.length; i++) {
-      if (((char) bytes[i]) == DELIMITER) {
-        index = i;
-        break;
-      }
-    }
-    if (index == -1) {
-      throw new IllegalArgumentException(regionName.toString() + " does not " +
-        "contain " + DELIMITER + " character");
-    }
-    byte [] tableName = new byte[index];
-    System.arraycopy(bytes, 0, tableName, 0, index);
-    return new Text(tableName);
-  }
-
-  /**
-   * @return the startKey
-   */
-  public Text getStartKey(){
-    return startKey;
-  }
-
-  /**
-   * @return the tableDesc
-   */
-  public HTableDescriptor getTableDesc(){
-    return tableDesc;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Comparable
-  /////////////////////////////////////////////////////////////////////////////
-  
   public int compareTo(Object o) {
     HRegionInfo other = (HRegionInfo) o;
     

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
Mon Jul 16 15:19:59 2007
@@ -22,6 +22,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;
@@ -211,6 +212,16 @@
       long timestamp, RowFilterInterface filter)
   throws IOException;
 
+  /**
+   * Applies a batch of updates via one RPC
+   * 
+   * @param regionName name of the region to update
+   * @param timestamp the time to be associated with the changes
+   * @param b BatchUpdate
+   * @throws IOException
+   */
+  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException;
+  
   /**
    * Get the next set of values
    * 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
Mon Jul 16 15:19:59 2007
@@ -38,6 +38,8 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.KeyedData;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
@@ -987,6 +989,29 @@
     return getRegion(regionName).getRegionInfo();
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException
{
+    for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
+      Text row = e.getKey();
+      long clientid = rand.nextLong();
+      long lockid = startUpdate(regionName, clientid, row);
+      for(BatchOperation op: e.getValue()) {
+        switch(op.getOp()) {
+        case BatchOperation.PUT_OP:
+          put(regionName, clientid, lockid, op.getColumn(), op.getValue());
+          break;
+          
+        case BatchOperation.DELETE_OP:
+          delete(regionName, clientid, lockid, op.getColumn());
+          break;
+        }
+      }
+      commit(regionName, clientid, lockid, timestamp);
+    }
+  }
+  
   /**
    * {@inheritDoc}
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java?view=diff&rev=556754&r1=556753&r2=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
Mon Jul 16 15:19:59 2007
@@ -24,25 +24,35 @@
 import java.io.*;
 import java.net.InetSocketAddress;
 
-/*******************************************************************************
+/**
  * HServerAddress is a "label" for a HBase server that combines the host
  * name and port number.
- ******************************************************************************/
-public class HServerAddress implements Writable {
+ */
+public class HServerAddress implements WritableComparable {
   private InetSocketAddress address;
-  private String stringValue;
+  String stringValue;
 
+  /** Empty constructor, used for Writable */
   public HServerAddress() {
     this.address = null;
     this.stringValue = null;
   }
-  
+
+  /**
+   * Construct a HServerAddress from an InetSocketAddress
+   * @param address InetSocketAddress of server
+   */
   public HServerAddress(InetSocketAddress address) {
     this.address = address;
     this.stringValue = address.getAddress().getHostAddress() + ":" +
       address.getPort();
   }
   
+  /**
+   * Construct a HServerAddress from a string of the form hostname:port
+   * 
+   * @param hostAndPort format 'hostname:port'
+   */
   public HServerAddress(String hostAndPort) {
     int colonIndex = hostAndPort.indexOf(':');
     if(colonIndex < 0) {
@@ -55,38 +65,76 @@
     this.stringValue = hostAndPort;
   }
   
+  /**
+   * Construct a HServerAddress from hostname, port number
+   * @param bindAddress host name
+   * @param port port number
+   */
   public HServerAddress(String bindAddress, int port) {
     this.address = new InetSocketAddress(bindAddress, port);
     this.stringValue = bindAddress + ":" + port;
   }
   
+  /**
+   * Construct a HServerAddress from another HServerAddress
+   * 
+   * @param other the HServerAddress to copy from
+   */
   public HServerAddress(HServerAddress other) {
     String bindAddress = other.getBindAddress();
     int port = other.getPort();
     address = new InetSocketAddress(bindAddress, port);
     stringValue = bindAddress + ":" + port;
   }
-  
+
+  /** @return host name */
   public String getBindAddress() {
     return address.getAddress().getHostAddress();
   }
-  
+
+  /** @return port number */
   public int getPort() {
     return address.getPort();
   }
-  
+
+  /** @return the InetSocketAddress */
   public InetSocketAddress getInetSocketAddress() {
     return address;
   }
-  
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public String toString() {
     return (stringValue == null ? "" : stringValue);
   }
 
-  //////////////////////////////////////////////////////////////////////////////
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean equals(Object o) {
+    return this.compareTo(o) == 0;
+  }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int hashCode() {
+    int result = this.address.hashCode();
+    result ^= this.stringValue.hashCode();
+    return result;
+  }
+  
+  //
   // Writable
-  //////////////////////////////////////////////////////////////////////////////
+  //
 
+  /**
+   * {@inheritDoc}
+   */
   public void readFields(DataInput in) throws IOException {
     String bindAddress = in.readUTF();
     int port = in.readInt();
@@ -101,6 +149,9 @@
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public void write(DataOutput out) throws IOException {
     if(address == null) {
       out.writeUTF("");
@@ -110,5 +161,17 @@
       out.writeUTF(address.getAddress().getHostAddress());
       out.writeInt(address.getPort());
     }
+  }
+  
+  //
+  // Comparable
+  //
+  
+  /**
+   * {@inheritDoc}
+   */
+  public int compareTo(Object o) {
+    HServerAddress other = (HServerAddress) o;
+    return this.toString().compareTo(other.toString());
   }
 }

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java?view=auto&rev=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
(added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
Mon Jul 16 15:19:59 2007
@@ -0,0 +1,121 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * batch update operation
+ */
+public class BatchOperation implements Writable {
+  /** put operation */
+  public static final int PUT_OP = 1;
+  
+  /** delete operation */
+  public static final int DELETE_OP = 2;
+  
+  private int op;
+  private Text column;
+  private byte[] value;
+  
+  /** default constructor used by Writable */
+  public BatchOperation() {
+    this.op = 0;
+    this.column = new Text();
+    this.value = null;
+  }
+  
+  /**
+   * Creates a put operation
+   * 
+   * @param column column name
+   * @param value column value
+   */
+  public BatchOperation(Text column, byte[] value) {
+    this.op = PUT_OP;
+    this.column = column;
+    this.value = value;
+  }
+  
+  /**
+   * Creates a delete operation
+   * 
+   * @param column name of column to delete
+   */
+  public BatchOperation(Text column) {
+    this.op = DELETE_OP;
+    this.column = column;
+    this.value = null;
+  }
+
+  /**
+   * @return the column
+   */
+  public Text getColumn() {
+    return column;
+  }
+
+  /**
+   * @return the operation
+   */
+  public int getOp() {
+    return op;
+  }
+
+  /**
+   * @return the value
+   */
+  public byte[] getValue() {
+    return value;
+  }
+  
+  //
+  // Writable
+  //
+
+  /**
+   * {@inheritDoc}
+   */
+  public void readFields(DataInput in) throws IOException {
+    op = in.readInt();
+    column.readFields(in);
+    if(op == PUT_OP) {
+      value = new byte[in.readInt()];
+      in.readFully(value);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(op);
+    column.write(out);
+    if(op == PUT_OP) {
+      out.writeInt(value.length);
+      out.write(value);
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?view=auto&rev=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
(added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
Mon Jul 16 15:19:59 2007
@@ -0,0 +1,168 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A Writable object that contains a series of BatchOperations
+ * 
+ * There is one BatchUpdate object per server, so a series of batch operations
+ * can result in multiple BatchUpdate objects if the batch contains rows that
+ * are served by multiple region servers.
+ */
+public class BatchUpdate implements Writable,
+Iterable<Map.Entry<Text, ArrayList<BatchOperation>>> {
+  
+  // used to generate lock ids
+  private Random rand;
+  
+  // used on client side to map lockid to a set of row updates
+  private HashMap<Long, ArrayList<BatchOperation>> lockToRowOps;
+  
+  // the operations for each row
+  private HashMap<Text, ArrayList<BatchOperation>> operations;
+  
+  /** constructor */
+  public BatchUpdate() {
+    this.rand = new Random();
+    this.lockToRowOps = new HashMap<Long, ArrayList<BatchOperation>>();
+    this.operations = new HashMap<Text, ArrayList<BatchOperation>>();
+  }
+
+  /** 
+   * Start a batch row insertion/update.
+   * 
+   * No changes are committed until the client commits the batch operation via
+   * HClient.batchCommit().
+   * 
+   * The entire batch update can be abandoned by calling HClient.batchAbort();
+   *
+   * Callers to this method are given a handle that corresponds to the row being
+   * changed. The handle must be supplied on subsequent put or delete calls so
+   * that the row can be identified.
+   * 
+   * @param row Name of row to start update against.
+   * @return Row lockid.
+   */
+  public synchronized long startUpdate(Text row) {
+    Long lockid = Long.valueOf(Math.abs(rand.nextLong()));
+    ArrayList<BatchOperation> ops = operations.get(row);
+    if(ops == null) {
+      ops = new ArrayList<BatchOperation>();
+      operations.put(row, ops);
+    }
+    lockToRowOps.put(lockid, ops);
+    return lockid.longValue();
+  }
+  
+  /** 
+   * Change a value for the specified column
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @param column              - column whose value is being set
+   * @param val                 - new value for column
+   */
+  public synchronized void put(long lockid, Text column, byte val[]) {
+    ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
+    if(ops == null) {
+      throw new IllegalArgumentException("no row for lockid " + lockid);
+    }
+    ops.add(new BatchOperation(column, val));
+  }
+  
+  /** 
+   * Delete the value for a column
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @param column              - name of column whose value is to be deleted
+   */
+  public synchronized void delete(long lockid, Text column) {
+    ArrayList<BatchOperation> ops = lockToRowOps.get(lockid);
+    if(ops == null) {
+      throw new IllegalArgumentException("no row for lockid " + lockid);
+    }
+    ops.add(new BatchOperation(column));
+  }
+
+  //
+  // Iterable
+  //
+  
+  /**
+   * @return Iterator<Map.Entry<Text, ArrayList<BatchOperation>>>
+   *         Text row -> ArrayList<BatchOperation> changes
+   */
+  public Iterator<Map.Entry<Text, ArrayList<BatchOperation>>> iterator()
{
+    return operations.entrySet().iterator();
+  }
+  
+  //
+  // Writable
+  //
+
+  /**
+   * {@inheritDoc}
+   */
+  public void readFields(DataInput in) throws IOException {
+    int nOps = in.readInt();
+    for (int i = 0; i < nOps; i++) {
+      Text row = new Text();
+      row.readFields(in);
+      
+      int nRowOps = in.readInt();
+      ArrayList<BatchOperation> rowOps = new ArrayList<BatchOperation>();
+      for(int j = 0; j < nRowOps; j++) {
+        BatchOperation op = new BatchOperation();
+        op.readFields(in);
+        rowOps.add(op);
+      }
+      
+      operations.put(row, rowOps);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(operations.size());
+    for (Map.Entry<Text, ArrayList<BatchOperation>> e: operations.entrySet())
{
+      e.getKey().write(out);
+      
+      ArrayList<BatchOperation> ops = e.getValue();
+      out.writeInt(ops.size());
+      
+      for(BatchOperation op: ops) {
+        op.write(out);
+      }
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java?view=auto&rev=556754
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
(added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
Mon Jul 16 15:19:59 2007
@@ -0,0 +1,104 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test batch updates
+ */
+public class TestBatchUpdate extends HBaseClusterTestCase {
+  private static final String CONTENTS_STR = "contents:";
+  private static final Text CONTENTS = new Text(CONTENTS_STR);
+  private static final byte[] value = { 1, 2, 3, 4 };
+
+  private HTableDescriptor desc = null;
+  private HClient client = null;
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    this.client = new HClient(conf);
+    this.desc = new HTableDescriptor("test");
+    desc.addFamily(new HColumnDescriptor(CONTENTS_STR));
+    try {
+      client.createTable(desc);
+      client.openTable(desc.getName());
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  /** the test case */
+  public void testBatchUpdate() {
+    try {
+      client.commitBatch();
+      
+    } catch (IllegalStateException e) {
+      // expected
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    client.startBatchUpdate();
+    
+    try {
+      client.openTable(HConstants.META_TABLE_NAME);
+      
+    } catch (IllegalStateException e) {
+      // expected
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+    try {
+      long lockid = client.startUpdate(new Text("row1"));
+      client.put(lockid, CONTENTS, value);
+      client.delete(lockid, CONTENTS);
+      
+      lockid = client.startUpdate(new Text("row2"));
+      client.put(lockid, CONTENTS, value);
+      
+      client.commitBatch();
+ 
+      Text[] columns = { CONTENTS };
+      HScannerInterface scanner = client.obtainScanner(columns, new Text());
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+      while(scanner.next(key, results)) {
+        for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+          System.out.println(key + ": row: " + e.getKey() + " value: " + 
+              new String(e.getValue()));
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}



Mime
View raw message