hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1502077 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/ruby/h...
Date Thu, 11 Jul 2013 01:03:49 GMT
Author: enis
Date: Thu Jul 11 01:03:48 2013
New Revision: 1502077

URL: http://svn.apache.org/r1502077
Log:
HBASE-8375 Durability setting per table

Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.95/hbase-server/src/main/ruby/hbase/admin.rb
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
Thu Jul 11 01:03:48 2013
@@ -34,9 +34,12 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -63,6 +66,8 @@ import com.google.protobuf.InvalidProtoc
 @InterfaceStability.Evolving
 public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
 
+  private static final Log LOG = LogFactory.getLog(HTableDescriptor.class);
+
   /**
    *  Changes prior to version 3 were not recorded here.
    *  Version 3 adds metadata as a map where keys and values are byte[].
@@ -153,12 +158,25 @@ public class HTableDescriptor implements
 
   /**
    * <em>INTERNAL</em> Used by HBase Shell interface to access this metadata
-   * attribute which denotes if the deferred log flush option is enabled
+   * attribute which denotes if the deferred log flush option is enabled.
+   * @deprecated Use {@link #DURABILITY} instead.
    */
+  @Deprecated
   public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH";
+  @Deprecated
   private static final ImmutableBytesWritable DEFERRED_LOG_FLUSH_KEY =
     new ImmutableBytesWritable(Bytes.toBytes(DEFERRED_LOG_FLUSH));
 
+  /**
+   * <em>INTERNAL</em> {@link Durability} setting for the table.
+   */
+  public static final String DURABILITY = "DURABILITY";
+  private static final ImmutableBytesWritable DURABILITY_KEY =
+      new ImmutableBytesWritable(Bytes.toBytes("DURABILITY"));
+
+  /** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value
*/
+  private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
+
   /*
    *  The below are ugly but better than creating them each time till we
    *  replace booleans being saved as Strings with plain booleans.  Need a
@@ -195,6 +213,7 @@ public class HTableDescriptor implements
         String.valueOf(DEFAULT_MEMSTORE_FLUSH_SIZE));
     DEFAULT_VALUES.put(DEFERRED_LOG_FLUSH,
         String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH));
+    DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
     for (String s : DEFAULT_VALUES.keySet()) {
       RESERVED_KEYWORDS.add(new ImmutableBytesWritable(Bytes.toBytes(s)));
     }
@@ -210,10 +229,11 @@ public class HTableDescriptor implements
    * Cache of whether this is root table or not.
    */
   private volatile Boolean root = null;
+
   /**
-   * Cache of whether deferred logging set.
+   * Durability setting for the table
    */
-  private Boolean deferredLog = null;
+  private Durability durability = null;
 
   /**
    * Maps column family name to the respective HColumnDescriptors
@@ -374,7 +394,6 @@ public class HTableDescriptor implements
       final boolean valueIfNull) {
     byte [] value = getValue(key);
     if (value != null) {
-      // TODO: Make value be a boolean rather than String of boolean.
       return Boolean.valueOf(Bytes.toString(value));
     }
     return valueIfNull;
@@ -525,6 +544,13 @@ public class HTableDescriptor implements
    */
   public void setValue(final ImmutableBytesWritable key,
       final ImmutableBytesWritable value) {
+    if (key.compareTo(DEFERRED_LOG_FLUSH_KEY) == 0) {
+      boolean isDeferredFlush = Boolean.valueOf(Bytes.toString(value.get()));
+      LOG.warn("HTableDescriptor property:" + DEFERRED_LOG_FLUSH + " is deprecated, " +
+          "use " + DURABILITY + " instead");
+      setDurability(isDeferredFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
+      return;
+    }
     values.put(key, value);
   }
 
@@ -591,13 +617,11 @@ public class HTableDescriptor implements
    * @return true if that deferred log flush is enabled on the table
    *
    * @see #setDeferredLogFlush(boolean)
+   * @deprecated use {@link #getDurability()}
    */
+  @Deprecated
   public synchronized boolean isDeferredLogFlush() {
-    if(this.deferredLog == null) {
-      this.deferredLog =
-          isSomething(DEFERRED_LOG_FLUSH_KEY, DEFAULT_DEFERRED_LOG_FLUSH);
-    }
-    return this.deferredLog;
+    return getDurability() == Durability.ASYNC_WAL;
   }
 
   /**
@@ -613,10 +637,42 @@ public class HTableDescriptor implements
    * </p>
    *
    * @param isDeferredLogFlush
+   * @deprecated use {@link #setDurability(Durability)}
    */
+  @Deprecated
   public synchronized void setDeferredLogFlush(final boolean isDeferredLogFlush) {
-    setValue(DEFERRED_LOG_FLUSH_KEY, isDeferredLogFlush? TRUE: FALSE);
-    this.deferredLog = isDeferredLogFlush;
+    this.setDurability(isDeferredLogFlush ? Durability.ASYNC_WAL : DEFAULT_DURABLITY);
+  }
+
+  /**
+   * Sets the {@link Durability} setting for the table. This defaults to Durability.USE_DEFAULT.
+   * @param durability enum value
+   */
+  public void setDurability(Durability durability) {
+    this.durability = durability;
+    setValue(DURABILITY_KEY, durability.name());
+  }
+
+  /**
+   * Returns the durability setting for the table.
+   * @return durability setting for the table.
+   */
+  public Durability getDurability() {
+    if (this.durability == null) {
+      byte[] durabilityValue = getValue(DURABILITY_KEY);
+      if (durabilityValue == null) {
+        this.durability = DEFAULT_DURABLITY;
+      } else {
+        try {
+          this.durability = Durability.valueOf(Bytes.toString(durabilityValue));
+        } catch (IllegalArgumentException ex) {
+          LOG.warn("Received " + ex + " because Durability value for HTableDescriptor"
+            + " is not known. Durability:" + Bytes.toString(durabilityValue));
+          this.durability = DEFAULT_DURABLITY;
+        }
+      }
+    }
+    return this.durability;
   }
 
   /**

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
(original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Durability.java
Thu Jul 11 01:03:48 2013
@@ -22,14 +22,16 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * Enum describing the durability guarantees for {@link Mutation}
+ * Enum describing the durability guarantees for tables and {@link Mutation}s
  * Note that the items must be sorted in order of increasing durability
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public enum Durability {
+  /* Developer note: Do not rename the enum field names. They are serialized in HTableDescriptor
*/
   /**
-   * Use the column family's default setting to determine durability.
+   * If this is for tables durability, use HBase's global default value (SYNC_WAL).
+   * Otherwise, if this is for mutation, use the table's default setting to determine durability.
    * This must remain the first option.
    */
   USE_DEFAULT,

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Thu Jul 11 01:03:48 2013
@@ -109,8 +109,8 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
@@ -192,6 +192,12 @@ public class HRegion implements HeapSize
   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
       "hbase.hregion.scan.loadColumnFamiliesOnDemand";
 
+  /**
+   * This is the global default value for durability. All tables/mutations not
+   * defining a durability or using USE_DEFAULT will default to this value.
+   */
+  private static final Durability DEFAULT_DURABLITY = Durability.SYNC_WAL;
+
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
    * want to do while in closing state; e.g. like offer this region up to the
@@ -399,6 +405,7 @@ public class HRegion implements HeapSize
   private final MetricsRegion metricsRegion;
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
   private final boolean deferredLogSyncDisabled;
+  private final Durability durability;
 
   /**
    * HRegion constructor. This constructor should only be used for testing and
@@ -508,6 +515,9 @@ public class HRegion implements HeapSize
     // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
     this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
         1 * 1000) <= 0;
+    this.durability = htd.getDurability() == Durability.USE_DEFAULT
+        ? DEFAULT_DURABLITY
+        : htd.getDurability();
     if (rsServices != null) {
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
@@ -651,6 +661,7 @@ public class HRegion implements HeapSize
       for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
         status.setStatus("Instantiating store for column family " + family);
         completionService.submit(new Callable<HStore>() {
+          @Override
           public HStore call() throws IOException {
             return instantiateHStore(family);
           }
@@ -826,7 +837,7 @@ public class HRegion implements HeapSize
   public void setRecovering(boolean newState) {
     this.getRegionInfo().setRecovering(newState);
   }
-  
+
   /**
    * @return True if current region is in recovering
    */
@@ -896,7 +907,7 @@ public class HRegion implements HeapSize
   private final Object closeLock = new Object();
 
   /** Conf key for the periodic flush interval */
-  public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = 
+  public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
       "hbase.regionserver.optionalcacheflushinterval";
   /** Default interval for the memstore flush */
   public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
@@ -992,6 +1003,7 @@ public class HRegion implements HeapSize
         for (final Store store : stores.values()) {
           completionService
               .submit(new Callable<Pair<byte[], Collection<StoreFile>>>()
{
+                @Override
                 public Pair<byte[], Collection<StoreFile>> call() throws IOException
{
                   return new Pair<byte[], Collection<StoreFile>>(
                     store.getFamily().getName(), store.close());
@@ -1082,6 +1094,7 @@ public class HRegion implements HeapSize
       new ThreadFactory() {
         private int count = 1;
 
+        @Override
         public Thread newThread(Runnable r) {
           return new Thread(r, threadNamePrefix + "-" + count++);
         }
@@ -1536,7 +1549,7 @@ public class HRegion implements HeapSize
 
     // sync unflushed WAL changes when deferred log sync is enabled
     // see HBASE-8208 for details
-    if (wal != null && isDeferredLogSyncEnabled()) {
+    if (wal != null && !shouldSyncLog()) {
       wal.sync();
     }
 
@@ -1921,7 +1934,7 @@ public class HRegion implements HeapSize
       Pair<Mutation, Integer>[] mutationsAndLocks) throws IOException {
     return batchMutate(mutationsAndLocks, false);
   }
- 
+
   /**
    * Perform a batch of mutations.
    * It supports only Put and Delete mutations and will ignore other types passed.
@@ -1970,7 +1983,7 @@ public class HRegion implements HeapSize
     }
     return batchOp.retCodeDetails;
   }
-  
+
 
   private void doPreMutationHook(BatchOperationInProgress<Pair<Mutation, Integer>>
batchOp)
       throws IOException {
@@ -2125,7 +2138,7 @@ public class HRegion implements HeapSize
           }
         }
       }
-      
+
       // we should record the timestamp only after we have acquired the rowLock,
       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
       now = EnvironmentEdgeManager.currentTimeMillis();
@@ -2165,8 +2178,8 @@ public class HRegion implements HeapSize
 
       // calling the pre CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,

+        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
       }
@@ -2202,7 +2215,7 @@ public class HRegion implements HeapSize
         batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
 
         Mutation m = batchOp.operations[i].getFirst();
-        Durability tmpDur = m.getDurability();
+        Durability tmpDur = getEffectiveDurability(m.getDurability());
         if (tmpDur.ordinal() > durability.ordinal()) {
           durability = tmpDur;
         }
@@ -2225,8 +2238,10 @@ public class HRegion implements HeapSize
       // STEP 5. Append the edit to WAL. Do not sync wal.
       // -------------------------
       Mutation first = batchOp.operations[firstIndex].getFirst();
-      txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
+      if (walEdit.size() > 0) {
+        txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(),
                walEdit, first.getClusterId(), now, this.htableDescriptor);
+      }
 
       // -------------------------------
       // STEP 6. Release row locks, etc.
@@ -2250,8 +2265,8 @@ public class HRegion implements HeapSize
       walSyncSuccessful = true;
       // calling the post CP hook for batch mutation
       if (!isInReplay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp = 
-          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,

+        MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
+          new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
@@ -2333,6 +2348,14 @@ public class HRegion implements HeapSize
     }
   }
 
+  /**
+   * Returns effective durability from the passed durability and
+   * the table descriptor.
+   */
+  protected Durability getEffectiveDurability(Durability d) {
+    return d == Durability.USE_DEFAULT ? this.durability : d;
+  }
+
   //TODO, Think that gets/puts and deletes should be refactored a bit so that
   //the getting of the lock happens before, so that you would just pass it into
   //the methods. So in the case of checkAndMutate you could just do lockRow,
@@ -3482,6 +3505,7 @@ public class HRegion implements HeapSize
     private long maxResultSize;
     private HRegion region;
 
+    @Override
     public HRegionInfo getRegionInfo() {
       return region.getRegionInfo();
     }
@@ -3674,6 +3698,7 @@ public class HRegion implements HeapSize
     /*
      * @return True if a filter rules the scanner is over, done.
      */
+    @Override
     public synchronized boolean isFilterDone() throws IOException {
       return this.filter != null && this.filter.filterAllRemaining();
     }
@@ -4634,7 +4659,7 @@ public class HRegion implements HeapSize
           }
           // 10. Sync edit log
           if (txid != 0) {
-            syncOrDefer(txid, processor.useDurability());
+            syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
           }
           walSyncSuccessful = true;
         }
@@ -4742,7 +4767,8 @@ public class HRegion implements HeapSize
     byte[] row = append.getRow();
     checkRow(row, "append");
     boolean flush = false;
-    boolean writeToWAL = append.getDurability() != Durability.SKIP_WAL;
+    Durability durability = getEffectiveDurability(append.getDurability());
+    boolean writeToWAL = durability != Durability.SKIP_WAL;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
@@ -4877,7 +4903,7 @@ public class HRegion implements HeapSize
       }
       if (writeToWAL) {
         // sync the transaction log outside the rowlock
-        syncOrDefer(txid, append.getDurability());
+        syncOrDefer(txid, durability);
       }
     } finally {
       if (w != null) {
@@ -4911,7 +4937,8 @@ public class HRegion implements HeapSize
     checkRow(row, "increment");
     TimeRange tr = increment.getTimeRange();
     boolean flush = false;
-    boolean writeToWAL = increment.getDurability() != Durability.SKIP_WAL;
+    Durability durability = getEffectiveDurability(increment.getDurability());
+    boolean writeToWAL = durability != Durability.SKIP_WAL;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.size());
     Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
@@ -5022,7 +5049,7 @@ public class HRegion implements HeapSize
       }
       if (writeToWAL) {
         // sync the transaction log outside the rowlock
-        syncOrDefer(txid, increment.getDurability());
+        syncOrDefer(txid, durability);
       }
     } finally {
       if (w != null) {
@@ -5059,7 +5086,7 @@ public class HRegion implements HeapSize
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (11 * Bytes.SIZEOF_LONG) +
+      (12 * Bytes.SIZEOF_LONG) +
       2 * Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@@ -5362,7 +5389,7 @@ public class HRegion implements HeapSize
     case DELETE:
     case BATCH_MUTATE:
       // when a region is in recovering state, no read, split or merge is allowed
-      if (this.isRecovering() && (this.disallowWritesInRecovering || 
+      if (this.isRecovering() && (this.disallowWritesInRecovering ||
               (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE)))
{
         throw new RegionInRecoveryException(this.getRegionNameAsString() + " is recovering");
       }
@@ -5485,8 +5512,8 @@ public class HRegion implements HeapSize
     } else {
       switch(durability) {
       case USE_DEFAULT:
-        // do what CF defaults to
-        if (!isDeferredLogSyncEnabled()) {
+        // do what table defaults to
+        if (shouldSyncLog()) {
           this.log.sync(txid);
         }
         break;
@@ -5509,10 +5536,11 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * check if current region is deferred sync enabled.
+   * Check whether we should sync the log from the table's durability settings
    */
-  private boolean isDeferredLogSyncEnabled() {
-    return (this.htableDescriptor.isDeferredLogFlush() && !this.deferredLogSyncDisabled);
+  private boolean shouldSyncLog() {
+    return this.deferredLogSyncDisabled ||
+        durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
   }
 
   /**

Modified: hbase/branches/0.95/hbase-server/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/ruby/hbase/admin.rb?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/ruby/hbase/admin.rb (original)
+++ hbase/branches/0.95/hbase-server/src/main/ruby/hbase/admin.rb Thu Jul 11 01:03:48 2013
@@ -261,6 +261,7 @@ module Hbase
         htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
         htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
         htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
+        htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY)))
if arg[DURABILITY]
         set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
         set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
         
@@ -469,6 +470,7 @@ module Hbase
         htd.setReadOnly(JBoolean.valueOf(arg.delete(READONLY))) if arg[READONLY]
         htd.setMemStoreFlushSize(JLong.valueOf(arg.delete(MEMSTORE_FLUSHSIZE))) if arg[MEMSTORE_FLUSHSIZE]
         htd.setDeferredLogFlush(JBoolean.valueOf(arg.delete(DEFERRED_LOG_FLUSH))) if arg[DEFERRED_LOG_FLUSH]
+        htd.setDurability(org.apache.hadoop.hbase.client.Durability.valueOf(arg.delete(DURABILITY)))
if arg[DURABILITY]
         set_user_metadata(htd, arg.delete(METADATA)) if arg[METADATA]
         set_descriptor_config(htd, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestHTableDescriptor.java
Thu Jul 11 01:03:48 2013
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -46,14 +47,14 @@ public class TestHTableDescriptor {
     HTableDescriptor htd = new HTableDescriptor(HTableDescriptor.META_TABLEDESC);
     final int v = 123;
     htd.setMaxFileSize(v);
-    htd.setDeferredLogFlush(true);
+    htd.setDurability(Durability.ASYNC_WAL);
     htd.setReadOnly(true);
     byte [] bytes = htd.toByteArray();
     HTableDescriptor deserializedHtd = HTableDescriptor.parseFrom(bytes);
     assertEquals(htd, deserializedHtd);
     assertEquals(v, deserializedHtd.getMaxFileSize());
     assertTrue(deserializedHtd.isReadOnly());
-    assertTrue(deserializedHtd.isDeferredLogFlush());
+    assertEquals(Durability.ASYNC_WAL, deserializedHtd.getDurability());
   }
 
   /**

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1502077&r1=1502076&r2=1502077&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Thu Jul 11 01:03:48 2013
@@ -18,6 +18,14 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -27,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -56,6 +65,7 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -3868,6 +3878,107 @@ public class TestHRegion extends HBaseTe
     assertEquals(Bytes.toBytes("value1"), kvs.get(0).getValue());
   }
 
+  @Test
+  public void testDurability() throws Exception {
+    String method = "testDurability";
+    // there are 5 x 5 cases:
+    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
+
+    // expected cases for append and sync wal
+    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
+
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
+
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+
+    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
+
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true,
false);
+
+    // expected cases for async wal
+    // do not sync for deferred flush with large optionallogflushinterval
+    conf.setLong("hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
+    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false,
false);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false,
false);
+
+    // now small deferred log flush optionallogflushinterval, expect sync
+    conf.setLong("hbase.regionserver.optionallogflushinterval", 5);
+    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false,
true);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false,
true);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false,
true);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false,
true);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false,
true);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false,
true);
+
+    // expect skip wal cases
+    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
+    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false,
false);
+    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false,
false);
+
+  }
+
+  private void durabilityTest(String method, Durability tableDurability,
+      Durability mutationDurability, long timeout, boolean expectAppend,
+      final boolean expectSync, final boolean expectSyncFromLogSyncer) throws Exception {
+    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+    Path logDir = new Path(new Path(DIR + method), "log");
+    HLog hlog = HLogFactory.createHLog(fs, logDir, UUID.randomUUID().toString(), conf);
+    final HLog log = spy(hlog);
+    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
+      HConstants.EMPTY_END_ROW, method, conf, false,
+      tableDurability, log, new byte[][] {family});
+
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
+    put.setDurability(mutationDurability);
+    region.put(put);
+
+    //verify append called or not
+    verify(log, expectAppend ? times(1) : never())
+      .appendNoSync((HRegionInfo)any(), eq(tableName),
+        (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any());
+
+    //verify sync called or not
+    if (expectSync || expectSyncFromLogSyncer) {
+      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          try {
+            if (expectSync) {
+              verify(log, times(1)).sync(anyLong()); //Hregion calls this one
+            } else if (expectSyncFromLogSyncer) {
+              verify(log, times(1)).sync(); //log syncer calls this one
+            }
+          } catch (Throwable ignore) {}
+          return true;
+        }
+      });
+    } else {
+      verify(log, never()).sync(anyLong());
+      verify(log, never()).sync();
+    }
+
+    hlog.close();
+    region.close();
+  }
+
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {
@@ -3997,6 +4108,13 @@ public class TestHRegion extends HBaseTe
     return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
   }
 
+  private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
+      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
+      throws IOException {
+    return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
+      Durability.SYNC_WAL, null, families);
+  }
+
   /**
    * @param tableName
    * @param startKey
@@ -4009,7 +4127,8 @@ public class TestHRegion extends HBaseTe
    * @return A region on which you must call {@link HRegion#closeHRegion(HRegion)} when done.
    */
   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
-      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
+      String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
+      HLog hlog, byte[]... families)
       throws IOException {
     HTableDescriptor htd = new HTableDescriptor(tableName);
     htd.setReadOnly(isReadOnly);
@@ -4019,6 +4138,7 @@ public class TestHRegion extends HBaseTe
       hcd.setMaxVersions(Integer.MAX_VALUE);
       htd.addFamily(hcd);
     }
+    htd.setDurability(durability);
     HRegionInfo info = new HRegionInfo(htd.getName(), startKey, stopKey, false);
     Path path = new Path(DIR + callingMethod);
     FileSystem fs = FileSystem.get(conf);
@@ -4027,7 +4147,7 @@ public class TestHRegion extends HBaseTe
         throw new IOException("Failed delete of " + path);
       }
     }
-    return HRegion.createHRegion(info, path, conf, htd);
+    return HRegion.createHRegion(info, path, conf, htd, hlog);
   }
 
   /**



Mime
View raw message