hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1231441 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionse...
Date Sat, 14 Jan 2012 04:59:45 GMT
Author: larsh
Date: Sat Jan 14 04:59:44 2012
New Revision: 1231441

URL: http://svn.apache.org/viewvc?rev=1231441&view=rev
Log:
HBASE-3584 Allow atomic put/delete in one call (Lars H)

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Jan 14 04:59:44
2012
@@ -752,6 +752,20 @@ public class HTable implements HTableInt
    * {@inheritDoc}
    */
   @Override
+  public void mutateRow(final RowMutation rm) throws IOException {
+    new ServerCallable<Void>(connection, tableName, rm.getRow(),
+        operationTimeout) {
+      public Void call() throws IOException {
+        server.mutateRow(location.getRegionInfo().getRegionName(), rm);
+        return null;
+      }
+    }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public Result append(final Append append) throws IOException {
     if (append.numFamilies() == 0) {
       throw new IOException(

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Sat Jan
14 04:59:44 2012
@@ -265,6 +265,16 @@ public interface HTableInterface extends
       byte[] value, Delete delete) throws IOException;
 
   /**
+   * Performs multiple mutations atomically on a single row. Currently
+   * {@link Put} and {@link Delete} are supported.
+   *
+   * @param arm object that specifies the set of mutations to perform
+   * atomically
+   * @throws IOException
+   */
+  public void mutateRow(final RowMutation rm) throws IOException;
+
+  /**
    * Appends values to one or more columns within a single row.
    * <p>
    * This operation does not appear atomic to readers.  Appends are done

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java?rev=1231441&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java Sat Jan 14 04:59:44
2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Performs multiple mutations atomically on a single row.
+ * Currently {@link Put} and {@link Delete} are supported.
+ *
+ * The mutations are performed in the order in which they
+ * were added.
+ */
+public class RowMutation implements Row {
+  private List<Mutation> mutations = new ArrayList<Mutation>();
+  private byte [] row;
+  private static final byte VERSION = (byte)0;
+
+  /** Constructor for Writable. DO NOT USE */
+  public RowMutation() {}
+
+  /**
+   * Create an atomic mutation for the specified row.
+   * @param row row key
+   */
+  public RowMutation(byte [] row) {
+    if(row == null || row.length > HConstants.MAX_ROW_LENGTH) {
+      throw new IllegalArgumentException("Row key is invalid");
+    }
+    this.row = Arrays.copyOf(row, row.length);
+  }
+
+  /**
+   * Add a {@link Put} operation to the list of mutations
+   * @param p The {@link Put} to add
+   * @throws IOException
+   */
+  public void add(Put p) throws IOException {
+    internalAdd(p);
+  }
+
+  /**
+   * Add a {@link Delete} operation to the list of mutations
+   * @param d The {@link Delete} to add
+   * @throws IOException
+   */
+  public void add(Delete d) throws IOException {
+    internalAdd(d);
+  }
+
+  private void internalAdd(Mutation m) throws IOException {
+    int res = Bytes.compareTo(this.row, m.getRow());
+    if(res != 0) {
+      throw new IOException("The row in the recently added Put/Delete " +
+          Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " +
+          Bytes.toStringBinary(this.row));
+    }
+    mutations.add(m);
+  }
+
+  @Override
+  public void readFields(final DataInput in) throws IOException {
+    int version = in.readByte();
+    if (version > VERSION) {
+      throw new IOException("version not supported");
+    }
+    this.row = Bytes.readByteArray(in);
+    int numMutations = in.readInt();
+    mutations.clear();
+    for(int i = 0; i < numMutations; i++) {
+      mutations.add((Mutation) HbaseObjectWritable.readObject(in, null));
+    }
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeByte(VERSION);
+    Bytes.writeByteArray(out, this.row);
+    out.writeInt(mutations.size());
+    for (Mutation m : mutations) {
+      HbaseObjectWritable.writeObject(out, m, m.getClass(), null);
+    }
+  }
+
+  @Override
+  public int compareTo(Row i) {
+    return Bytes.compareTo(this.getRow(), i.getRow());
+  }
+
+  @Override
+  public byte[] getRow() {
+    return row;
+  }
+
+  /**
+   * @return An unmodifiable list of the current mutations.
+   */
+  public List<Mutation> getMutations() {
+    return Collections.unmodifiableList(mutations);
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java Sat
Jan 14 04:59:44 2012
@@ -501,6 +501,11 @@ public abstract class CoprocessorHost<E 
           byte[] row) {
         return table.coprocessorProxy(protocol, row);
       }
+
+      @Override
+      public void mutateRow(RowMutation rm) throws IOException {
+        table.mutateRow(rm);
+      }
     }
 
     /** The coprocessor */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Sat Jan
14 04:59:44 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HServerLo
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -232,10 +233,12 @@ public class HbaseObjectWritable impleme
     addToMap(ColumnRangeFilter.class, code++);
 
     addToMap(HServerLoad.class, code++);
-    
+
     addToMap(RegionOpeningState.class, code++);
 
     addToMap(Append.class, code++);
+
+    addToMap(RowMutation.class, code++);
   }
 
   private Class<?> declaredClass;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Sat Jan 14
04:59:44 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HServerIn
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
@@ -262,6 +263,9 @@ public interface HRegionInterface extend
       byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
   throws IOException;
 
+  public void mutateRow(byte[] regionName, RowMutation rm)
+      throws IOException;
+
   /**
    * Appends values to one or more columns values in a row. Optionally
    * Returns the updated keys after the append.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Jan 14
04:59:44 2012
@@ -77,10 +77,12 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Row;
@@ -1684,7 +1686,7 @@ public class HRegion implements HeapSize
       try {
         // All edits for the given row (across all column families) must happen atomically.
         prepareDelete(delete);
-        internalDelete(delete, delete.getClusterId(), writeToWAL);
+        internalDelete(delete, delete.getClusterId(), writeToWAL, null, null);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -1705,21 +1707,26 @@ public class HRegion implements HeapSize
     delete.setFamilyMap(familyMap);
     delete.setClusterId(clusterId);
     delete.setWriteToWAL(writeToWAL);
-    internalDelete(delete, clusterId, writeToWAL);
+    internalDelete(delete, clusterId, writeToWAL, null, null);
   }
 
   /**
+   * @param delete The Delete command
    * @param familyMap map of family to edits for the given family.
    * @param writeToWAL
+   * @param writeEntry Optional mvcc write point to use
+   * @param walEdit Optional walEdit to use. A non-null walEdit indicates
+   * that the coprocessor hooks are run by the caller
    * @throws IOException
    */
   private void internalDelete(Delete delete, UUID clusterId,
-      boolean writeToWAL) throws IOException {
+      boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry,
+      WALEdit walEdit) throws IOException {
     Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
-    WALEdit walEdit = new WALEdit();
+    WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
-    if (coprocessorHost != null) {
-      if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) {
+    if (coprocessorHost != null && walEdit == null) {
+      if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) {
         return;
       }
     }
@@ -1783,23 +1790,22 @@ public class HRegion implements HeapSize
         //
         // bunch up all edits across all column families into a
         // single WALEdit.
-        addFamilyMapToWALEdit(familyMap, walEdit);
+        addFamilyMapToWALEdit(familyMap, localWalEdit);
         this.log.append(regionInfo, this.htableDescriptor.getName(),
-            walEdit, clusterId, now, this.htableDescriptor);
+            localWalEdit, clusterId, now, this.htableDescriptor);
       }
 
       // Now make changes to the memstore.
-      long addedSize = applyFamilyMapToMemstore(familyMap, null);
+      long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
 
-      if (coprocessorHost != null) {
-        coprocessorHost.postDelete(delete, walEdit, writeToWAL);
-      }
     } finally {
       this.updatesLock.readLock().unlock();
     }
-
     // do after lock
+    if (coprocessorHost != null && walEdit == null) {
+      coprocessorHost.postDelete(delete, localWalEdit, writeToWAL);
+    }
     final long after = EnvironmentEdgeManager.currentTimeMillis();
     final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix(
         getTableDesc().getNameAsString(), familyMap.keySet());
@@ -1870,7 +1876,7 @@ public class HRegion implements HeapSize
 
       try {
         // All edits for the given row (across all column families) must happen atomically.
-        internalPut(put, put.getClusterId(), writeToWAL);
+        internalPut(put, put.getClusterId(), writeToWAL, null, null);
       } finally {
         if(lockid == null) releaseRowLock(lid);
       }
@@ -2299,11 +2305,13 @@ public class HRegion implements HeapSize
           // originating cluster. A slave cluster receives the result as a Put
           // or Delete
           if (isPut) {
-            internalPut(((Put)w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
+            internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL,
+                null, null);
           } else {
             Delete d = (Delete)w;
             prepareDelete(d);
-            internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
+            internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null,
+                null);
           }
           return true;
         }
@@ -2398,7 +2406,7 @@ public class HRegion implements HeapSize
     p.setFamilyMap(familyMap);
     p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
     p.setWriteToWAL(true);
-    this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true);
+    this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null);
   }
 
   /**
@@ -2406,15 +2414,18 @@ public class HRegion implements HeapSize
    * Warning: Assumption is caller has lock on passed in row.
    * @param put The Put command
    * @param writeToWAL if true, then we should write to the log
+   * @param writeEntry Optional mvcc write point to use
+   * @param walEdit Optional walEdit to use. A non-null walEdit indicates
+   * that the coprocessor hooks are run by the caller
    * @throws IOException
    */
-  private void internalPut(Put put, UUID clusterId,
-      boolean writeToWAL) throws IOException {
+  private void internalPut(Put put, UUID clusterId, boolean writeToWAL,
+      MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException
{
     Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
-    WALEdit walEdit = new WALEdit();
+    WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit;
     /* run pre put hook outside of lock to avoid deadlock */
-    if (coprocessorHost != null) {
-      if (coprocessorHost.prePut(put, walEdit, writeToWAL)) {
+    if (coprocessorHost != null && walEdit == null) {
+      if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) {
         return;
       }
     }
@@ -2434,19 +2445,19 @@ public class HRegion implements HeapSize
       // for some reason fail to write/sync to commit log, the memstore
       // will contain uncommitted transactions.
       if (writeToWAL) {
-        addFamilyMapToWALEdit(familyMap, walEdit);
+        addFamilyMapToWALEdit(familyMap, localWalEdit);
         this.log.append(regionInfo, this.htableDescriptor.getName(),
-            walEdit, clusterId, now, this.htableDescriptor);
+            localWalEdit, clusterId, now, this.htableDescriptor);
       }
 
-      long addedSize = applyFamilyMapToMemstore(familyMap, null);
+      long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry);
       flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
     } finally {
       this.updatesLock.readLock().unlock();
     }
 
-    if (coprocessorHost != null) {
-      coprocessorHost.postPut(put, walEdit, writeToWAL);
+    if (coprocessorHost != null && walEdit == null) {
+      coprocessorHost.postPut(put, localWalEdit, writeToWAL);
     }
 
     // do after lock
@@ -4129,6 +4140,95 @@ public class HRegion implements HeapSize
     return results;
   }
 
+  public int mutateRow(RowMutation rm,
+      Integer lockid) throws IOException {
+
+    startRegionOperation();
+    List<WALEdit> walEdits = new ArrayList<WALEdit>(rm.getMutations().size());
+
+    // 1. run all pre-hooks before the atomic operation
+    // if any pre hook indicates "bypass", bypass the entire operation
+    // Note that this requires creating the WALEdits here and passing
+    // them to the actual Put/Delete operations.
+    for (Mutation m : rm.getMutations()) {
+      WALEdit walEdit = new WALEdit();
+      walEdits.add(walEdit);
+      if (coprocessorHost == null) {
+        continue;
+      }
+      if (m instanceof Put) {
+        if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
+          // by pass everything
+          return 0;
+        }
+      } else if (m instanceof Delete) {
+        Delete d = (Delete) m;
+        prepareDelete(d);
+        if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
+          // by pass everything
+          return 0;
+        }
+      }
+    }
+
+    // 2. acquire the row lock
+    Integer lid = getLock(lockid, rm.getRow(), true);
+
+    // 3. acquire the region lock
+    this.updatesLock.readLock().lock();
+
+    // 4. Get a mvcc write number
+    MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
+    try {
+      int i = 0;
+      // 5. Perform the actual mutations
+      for (Mutation m : rm.getMutations()) {
+        if (m instanceof Put) {
+          internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID,
+              m.getWriteToWAL(), w, walEdits.get(i));
+        } else if (m instanceof Delete) {
+          Delete d = (Delete) m;
+          prepareDelete(d);
+          internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(),
+              w, walEdits.get(i));
+        } else {
+          throw new DoNotRetryIOException(
+              "Action must be Put or Delete. But was: "
+                  + m.getClass().getName());
+        }
+        i++;
+      }
+      return i;
+    } finally {
+      // 6. roll mvcc forward
+      mvcc.completeMemstoreInsert(w);
+      // 7. release region lock
+      this.updatesLock.readLock().unlock();
+      try {
+        // 8. run all coprocessor post hooks
+        if (coprocessorHost != null) {
+          int i = 0;
+          for (Mutation m : rm.getMutations()) {
+            if (m instanceof Put) {
+              coprocessorHost.postPut((Put) m, walEdits.get(i),
+                  m.getWriteToWAL());
+            } else if (m instanceof Delete) {
+              coprocessorHost.postDelete((Delete) m, walEdits.get(i),
+                  m.getWriteToWAL());
+            }
+            i++;
+          }
+        }
+      } finally {
+        if (lid != null) {
+          // 9. release the row lock
+          releaseRowLock(lid);
+        }
+        closeRegionOperation();
+      }
+    }
+  }
+
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat
Jan 14 04:59:44 2012
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.M
 import org.apache.hadoop.hbase.catalog.RootLocationEditor;
 import org.apache.hadoop.hbase.client.Action;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -3152,6 +3153,27 @@ public class HRegionServer implements HR
   }
 
   @Override
+  public void mutateRow(byte[] regionName, RowMutation rm)
+      throws IOException {
+    checkOpen();
+    if (regionName == null) {
+      throw new IOException("Invalid arguments to atomicMutation " +
+      "regionName is null");
+    }
+    requestCount.incrementAndGet();
+    try {
+      HRegion region = getRegion(regionName);
+      if (!region.getRegionInfo().isMetaTable()) {
+        this.cacheFlusher.reclaimMemStoreMemory();
+      }
+      region.mutateRow(rm, null);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  @Override
   public Result append(byte[] regionName, Append append)
   throws IOException {
     checkOpen();
@@ -3296,6 +3318,9 @@ public class HRegionServer implements HR
           } else if (action instanceof Append) {
             response.add(regionName, originalIndex,
                 append(regionName, (Append)action));
+          } else if (action instanceof RowMutation) {
+            mutateRow(regionName, (RowMutation)action);
+            response.add(regionName, originalIndex, new Result());
           } else {
             LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
                 "Put, Exec, Increment, or Append.");

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Sat Jan
14 04:59:44 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -647,4 +648,9 @@ public class RemoteHTable implements HTa
       throws IOException, Throwable {
     throw new UnsupportedOperationException("coprocessorExec not implemented");
   }
+
+  @Override
+  public void mutateRow(RowMutation rm) throws IOException {
+    throw new IOException("atomicMutation not supported");
+  }
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Sat Jan
14 04:59:44 2012
@@ -34,6 +34,8 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -4036,6 +4038,34 @@ public class TestFromClientSide {
   }
 
   @Test
+  public void testRowMutation() throws Exception {
+    LOG.info("Starting testRowMutation");
+    final byte [] TABLENAME = Bytes.toBytes("testRowMutation");
+    HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b")
+    };
+    RowMutation arm = new RowMutation(ROW);
+    arm.add(new Delete(ROW));
+    Put p = new Put(ROW);
+    p.add(FAMILY, QUALIFIERS[0], VALUE);
+    arm.add(p);
+    t.mutateRow(arm);
+
+    Get g = new Get(ROW);
+    Result r = t.get(g);
+    // delete was first, row should exist
+    assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
+
+    arm = new RowMutation(ROW);
+    arm.add(p);
+    arm.add(new Delete(ROW));
+    t.batch(Arrays.asList((Row)arm));
+    r = t.get(g);
+    assertTrue(r.isEmpty());
+  }
+
+  @Test
   public void testAppend() throws Exception {
     LOG.info("Starting testAppend");
     final byte [] TABLENAME = Bytes.toBytes("testAppend");

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
Sat Jan 14 04:59:44 2012
@@ -132,6 +132,41 @@ public class TestRegionObserverInterface
   }
 
   @Test
+  public void testRowMutation() throws IOException {
+    byte[] tableName = TEST_TABLE;
+    HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+            "hadDeleted"},
+        TEST_TABLE,
+        new Boolean[] {false, false, false, false, false});
+
+    Put put = new Put(ROW);
+    put.add(A, A, A);
+    put.add(B, B, B);
+    put.add(C, C, C);
+
+    Delete delete = new Delete(ROW);
+    delete.deleteColumn(A, A);
+    delete.deleteColumn(B, B);
+    delete.deleteColumn(C, C);
+
+    RowMutation arm = new RowMutation(ROW);
+    arm.add(put);
+    arm.add(delete);
+    table.mutateRow(arm);
+
+    verifyMethodResult(SimpleRegionObserver.class,
+        new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+            "hadDeleted"},
+        TEST_TABLE,
+        new Boolean[] {false, false, true, true, true}
+    );
+    util.deleteTable(tableName);
+    table.close();
+  }
+
+  @Test
   public void testIncrementHook() throws IOException {
     byte[] tableName = TEST_TABLE;
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java?rev=1231441&r1=1231440&r2=1231441&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
(original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
Sat Jan 14 04:59:44 2012
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionse
 
 
 import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -26,10 +29,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.RowMutation;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.junit.experimental.categories.Category;
 
@@ -239,7 +245,98 @@ public class TestAtomicOperation extends
     }
   }
 
+  /**
+   * Test multi-threaded increments.
+   */
+  public void testRowMutationMultiThreads() throws IOException {
+
+    LOG.info("Starting test testMutationMultiThreads");
+    initHRegion(tableName, getName(), fam1);
+
+    // create 100 threads, each will alternate between adding and
+    // removing a column
+    int numThreads = 100;
+    int opsPerThread = 1000;
+    AtomicOperation[] all = new AtomicOperation[numThreads];
+
+    AtomicLong timeStamps = new AtomicLong(0);
+    AtomicInteger failures = new AtomicInteger(0);
+    // create all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures);
+    }
+
+    // run all threads
+    for (int i = 0; i < numThreads; i++) {
+      all[i].start();
+    }
+
+    // wait for all threads to finish
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        all[i].join();
+      } catch (InterruptedException e) {
+      }
+    }
+    assertEquals(0, failures.get());
+  }
+
 
+  public static class AtomicOperation extends Thread {
+    private final HRegion region;
+    private final int numOps;
+    private final AtomicLong timeStamps;
+    private final AtomicInteger failures;
+    private final Random r = new Random();
+    public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger
failures) {
+      this.region = region;
+      this.numOps = numOps;
+      this.timeStamps = timeStamps;
+      this.failures = failures;
+    }
+    @Override
+    public void run() {
+      boolean op = true;
+      for (int i=0; i<numOps; i++) {
+        try {
+          // throw in some flushes
+          if (r.nextFloat() < 0.001) {
+            LOG.debug("flushing");
+            region.flushcache();
+          }
+          long ts = timeStamps.incrementAndGet();
+          RowMutation arm = new RowMutation(row);
+          if (op) {
+            Put p = new Put(row, ts);
+            p.add(fam1, qual1, value1);
+            arm.add(p);
+            Delete d = new Delete(row);
+            d.deleteColumns(fam1, qual2, ts);
+            arm.add(d);
+          } else {
+            Delete d = new Delete(row);
+            d.deleteColumns(fam1, qual1, ts);
+            arm.add(d);
+            Put p = new Put(row, ts);
+            p.add(fam1, qual2, value2);
+            arm.add(p);
+          }
+          region.mutateRow(arm, null);
+          op ^= true;
+          // check: should always see exactly one column
+          Get g = new Get(row);
+          Result r = region.get(g, null);
+          if (r.size() != 1) {
+            LOG.debug(r);
+            failures.incrementAndGet();
+            fail();
+          }
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
 
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =



Mime
View raw message