accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [05/18] git commit: ACCUMULO-1957 per-table durability settings
Date Fri, 05 Sep 2014 21:17:21 GMT
ACCUMULO-1957 per-table durability settings


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e3aa7eac
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e3aa7eac
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e3aa7eac

Branch: refs/heads/master
Commit: e3aa7eac9ee5a8ac79b481cae5d8a47d62a104b5
Parents: 17f6250
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Wed Aug 27 15:29:32 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Sep 5 17:16:58 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  13 +-
 .../apache/accumulo/core/conf/PropertyType.java |   2 +
 .../apache/accumulo/server/init/Initialize.java |   2 +-
 .../accumulo/tserver/TabletMutations.java       |  10 +-
 .../apache/accumulo/tserver/TabletServer.java   |   3 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  |  65 +++++---
 .../tserver/log/TabletServerLogger.java         |  19 +--
 .../accumulo/tserver/tablet/CommitSession.java  |   4 +-
 .../accumulo/tserver/tablet/Durability.java     |  33 ++++
 .../apache/accumulo/tserver/tablet/Tablet.java  |   4 +-
 .../tserver/tablet/TabletCommitter.java         |   2 +-
 .../accumulo/test/functional/DurabilityIT.java  | 163 +++++++++++++++++++
 12 files changed, 275 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 9837867..72d9aa1 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -276,10 +276,8 @@ public enum Property {
       "The number of threads for the distributed work queue. These threads are used for copying
failed bulk files."),
   TSERV_WAL_SYNC("tserver.wal.sync", "true", PropertyType.BOOLEAN,
       "Use the SYNC_BLOCK create flag to sync WAL writes to disk. Prevents problems recovering
from sudden system resets."),
-  TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "The method
to invoke when sync'ing WALs. HSync will provide " +
-      "resiliency in the face of unexpected power outages, at the cost of speed. If method
is not available, the legacy 'sync' method " +
-      "will be used to ensure backwards compatibility with older Hadoop versions. A value
of 'hflush' is the alternative to the default value " +
-      "of 'hsync' which will result in faster writes, but with less durability"),
+  @Deprecated
+  TSERV_WAL_SYNC_METHOD("tserver.wal.sync.method", "hsync", PropertyType.STRING, "This property
is deprecated. Use table.durability instead."),
   TSERV_REPLICATION_REPLAYERS("tserver.replication.replayer.", null, PropertyType.PREFIX,
"Allows configuration of implementation used to apply replicated data"),
   TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
       PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
@@ -379,7 +377,8 @@ public enum Property {
       "Determines the max # of files each tablet in a table can have. When adjusting this
property you may want to consider adjusting"
           + " table.compaction.major.ratio also. Setting this property to 0 will make it
default to tserver.scan.files.open.max-1, this will prevent a"
           + " tablet from having more files than can be opened. Setting this property low
may throttle ingest and increase query performance."),
-  TABLE_WALOG_ENABLED("table.walog.enabled", "true", PropertyType.BOOLEAN, "Use the write-ahead
log to prevent the loss of data."),
+  @Deprecated
+  TABLE_WALOG_ENABLED("table.walog.enabled", "true", PropertyType.BOOLEAN, "This setting
is deprecated.  Use table.durability=none instead."),
   TABLE_BLOOM_ENABLED("table.bloom.enabled", "false", PropertyType.BOOLEAN, "Use bloom filters
on this table."),
   TABLE_BLOOM_LOAD_THRESHOLD("table.bloom.load.threshold", "1", PropertyType.COUNT,
       "This number of seeks that would actually use a bloom filter must occur before a file's
bloom filter is loaded."
@@ -391,6 +390,10 @@ public enum Property {
           + ",org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor, and org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor
are"
           + " allowable values. One can extend any of the above mentioned classes to perform
specialized parsing of the key. "),
   TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING, "The bloom
filter hash type"),
+  TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY, "The durability used
to write to the write-ahead log." + 
+      " Legal values are: none, which skips the write-ahead log; " + 
+      "flush, which pushes data to the file system; and " + 
+      "sync, which ensures the data is written to disk."),
   TABLE_FAILURES_IGNORE("table.failures.ignore", "false", PropertyType.BOOLEAN,
       "If you want queries for your table to hang or fail when data is missing from the system,
"
           + "then set this to false. When this set to true missing data will be reported
but queries "

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
index f39a8bd..5d5dd5f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
@@ -67,6 +67,8 @@ public enum PropertyType {
 
   CLASSNAME("java class", "[\\w$.]*", "A fully qualified java class name representing a class
on the classpath.\n"
       + "An example is 'java.lang.String', rather than 'String'"),
+      
+  DURABILITY("durability", "(?:none|log|flush|sync)", "One of 'none', 'flush' or 'sync'."),
 
   STRING("string", ".*",
       "An arbitrary string of characters whose format is unspecified and interpreted based
on the context of the property to which it applies."), BOOLEAN(

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9b952ba..5f1e287 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -130,7 +130,7 @@ public class Initialize {
   static {
     initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
     initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
-    initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
+    initialMetadataConf.put(Property.TABLE_DURABILITY.getKey(), "sync");
     initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
     initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
     initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index e814f0e..a30fa02 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -19,16 +19,19 @@ package org.apache.accumulo.tserver;
 import java.util.List;
 
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.tablet.Durability;
 
 public class TabletMutations {
   private final int tid; 
   private final int seq; 
   private final List<Mutation> mutations;
+  private final Durability durability;
 
-  public TabletMutations(int tid, int seq, List<Mutation> mutations) {
+  public TabletMutations(int tid, int seq, List<Mutation> mutations, Durability durability)
{
     this.tid = tid;
     this.seq = seq;
     this.mutations = mutations;
+    this.durability = durability;
   }
 
   public List<Mutation> getMutations() {
@@ -38,10 +41,13 @@ public class TabletMutations {
   public int getTid() {
     return tid;
   }
+  
   public int getSeq() {
     return seq;
   }
   
-  
+  public Durability getDurability() {
+    return durability;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 57e3dee..63bf4a3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -215,6 +215,7 @@ import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.accumulo.tserver.tablet.CompactionInfo;
 import org.apache.accumulo.tserver.tablet.CompactionWatcher;
 import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.accumulo.tserver.tablet.KVEntry;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
@@ -2899,7 +2900,7 @@ public class TabletServer implements Runnable {
 
   public int createLogId(KeyExtent tablet) {
     AccumuloConfiguration acuTableConf = getTableConfiguration(tablet);
-    if (acuTableConf.getBoolean(Property.TABLE_WALOG_ENABLED)) {
+    if (Durability.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) != Durability.NONE)
{
       return logIdGenerator.incrementAndGet();
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index c01e54a..d907ee7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -59,6 +59,7 @@ import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -124,7 +125,7 @@ public class DfsLogger {
 
   private final Object closeLock = new Object();
 
-  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
+  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, Durability.FLUSH);
 
   private static final LogFileValue EMPTY = new LogFileValue();
 
@@ -145,9 +146,31 @@ public class DfsLogger {
           continue;
         }
         workQueue.drainTo(work);
+        
+        Method durabilityMethod = null;
+        loop:
+        for (LogWork logWork : work) {
+          switch (logWork.durability) {
+            case NONE:
+              // shouldn't make it to the work queue
+              break;
+            case LOG:
+              // do nothing
+              break;
+            case SYNC:
+              durabilityMethod = sync;
+              break loop;
+            case FLUSH:
+              if (durabilityMethod == null) {
+                durabilityMethod = flush;
+              }
+              break;
+          }
+        }
 
         try {
-          sync.invoke(logFile);
+          if (durabilityMethod != null)
+            durabilityMethod.invoke(logFile);
         } catch (Exception ex) {
           log.warn("Exception syncing " + ex);
           for (DfsLogger.LogWork logWork : work) {
@@ -165,11 +188,13 @@ public class DfsLogger {
   }
 
   static class LogWork {
-    CountDownLatch latch;
+    final CountDownLatch latch;
+    final Durability durability;
     volatile Exception exception;
 
-    public LogWork(CountDownLatch latch) {
+    public LogWork(CountDownLatch latch, Durability durability) {
       this.latch = latch;
+      this.durability = durability;
     }
   }
 
@@ -213,11 +238,12 @@ public class DfsLogger {
     // filename is unique
     return getFileName().hashCode();
   }
-
+  
   private final ServerResources conf;
   private FSDataOutputStream logFile;
   private DataOutputStream encryptingLogFile = null;
   private Method sync;
+  private Method flush;
   private String logPath;
   private Daemon syncThread;
 
@@ -337,16 +363,13 @@ public class DfsLogger {
       else
         logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
 
-      String syncMethod = conf.getConfiguration().get(Property.TSERV_WAL_SYNC_METHOD);
       try {
-        // hsync: send data to datanodes and sync the data to disk
-        sync = logFile.getClass().getMethod(syncMethod);
+        sync = logFile.getClass().getMethod("hsync");
+        flush = logFile.getClass().getMethod("hflush");
       } catch (Exception ex) {
-        log.warn("Could not find configured " + syncMethod + " method, trying to fall back
to old Hadoop sync method", ex);
-
         try {
-          // sync: send data to datanodes
-          sync = logFile.getClass().getMethod("sync");
+          // fall back to sync: send data to datanodes
+          flush = sync = logFile.getClass().getMethod("sync");
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -387,7 +410,6 @@ public class DfsLogger {
       key.tserverSession = filename;
       key.filename = filename;
       write(key, EMPTY);
-      sync.invoke(logFile);
       log.debug("Got new write-ahead log: " + this);
     } catch (Exception ex) {
       if (logFile != null)
@@ -499,12 +521,12 @@ public class DfsLogger {
     encryptingLogFile.flush();
   }
 
-  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
-    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
+  public LoggerOperation log(int seq, int tid, Mutation mutation, Durability durability)
throws IOException {
+    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation),
durability)));
   }
 
-  private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys)
throws IOException {
-    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
+  private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
Durability durability) throws IOException {
+    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability);
     synchronized (DfsLogger.this) {
       try {
         for (Pair<LogFileKey,LogFileValue> pair : keys) {
@@ -531,6 +553,7 @@ public class DfsLogger {
   }
 
   public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException
{
+    Durability durability = Durability.NONE;
     List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<Pair<LogFileKey,LogFileValue>>();
     for (TabletMutations tabletMutations : mutations) {
       LogFileKey key = new LogFileKey();
@@ -540,8 +563,10 @@ public class DfsLogger {
       LogFileValue value = new LogFileValue();
       value.mutations = tabletMutations.getMutations();
       data.add(new Pair<LogFileKey,LogFileValue>(key, value));
+      if (tabletMutations.getDurability().ordinal() > durability.ordinal())
+        durability = tabletMutations.getDurability();
     }
-    return logFileData(data);
+    return logFileData(data, durability);
   }
 
   public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException
{
@@ -549,7 +574,7 @@ public class DfsLogger {
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tid = tid;
-    return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key,
EMPTY)));
+    return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key,
EMPTY)), Durability.SYNC);
   }
 
   public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException
{
@@ -558,7 +583,7 @@ public class DfsLogger {
     key.seq = seq;
     key.tid = tid;
     key.filename = fqfn;
-    return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key,
EMPTY)));
+    return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key,
EMPTY)), Durability.SYNC);
   }
 
   public String getLogger() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 26e6891..56998d4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
 import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -83,14 +84,6 @@ public class TabletServerLogger {
 
   private final AtomicInteger seqGen = new AtomicInteger();
 
-  private static boolean enabled(TableConfiguration tconf) {
-    return tconf.getBoolean(Property.TABLE_WALOG_ENABLED);
-  }
-
-  private static boolean enabled(CommitSession commitSession) {
-    return commitSession.getUseWAL();
-  }
-
   static private abstract class TestCallWithWriteLock {
     abstract boolean test();
 
@@ -369,13 +362,17 @@ public class TabletServerLogger {
     });
   }
 
+  private boolean enabled(CommitSession commitSession) {
+    return commitSession.getDurabilty() != Durability.NONE;
+  }
+
   public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m)
throws IOException {
     if (!enabled(commitSession))
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        return logger.log(tabletSeq, commitSession.getLogId(), m);
+        return logger.log(tabletSeq, commitSession.getLogId(), m, commitSession.getDurabilty());
       }
     });
     logSizeEstimate.addAndGet(m.numBytes());
@@ -398,7 +395,7 @@ public class TabletServerLogger {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet())
{
           CommitSession cs = entry.getKey();
-          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
+          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue(),
cs.getDurabilty()));
         }
         return logger.logManyTablets(copy);
       }
@@ -448,7 +445,7 @@ public class TabletServerLogger {
 
   public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path>
logs, Set<String> tabletFiles, MutationReceiver mr)
       throws IOException {
-    if (!enabled(tconf))
+    if (Durability.fromString(tconf.get(Property.TABLE_DURABILITY)) == Durability.NONE)
       return;
     try {
       SortedLogRecovery recovery = new SortedLogRecovery(fs);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index 6402797..b2d89c9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -111,8 +111,8 @@ public class CommitSession {
     return maxCommittedTime;
   }
 
-  public boolean getUseWAL() {
-    return committer.getUseWAL();
+  public Durability getDurabilty() {
+    return committer.getDurability();
   }
 
   public void mutate(List<Mutation> mutations) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
new file mode 100644
index 0000000..675b196
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public enum Durability {
+  NONE,
+  LOG,
+  FLUSH,
+  SYNC;
+  
+  static public Durability fromString(String value) {
+    try {
+      return Durability.valueOf(value.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      return Durability.SYNC;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 37950fc..fdf072a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -2510,8 +2510,8 @@ public class Tablet implements TabletCommitter {
   }
 
   @Override
-  public boolean getUseWAL() {
-    return getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
+  public Durability getDurability() {
+    return Durability.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index a5d197c..b6bb458 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -44,7 +44,7 @@ public interface TabletCommitter {
 
   int getLogId();
 
-  boolean getUseWAL();
+  Durability getDurability();
 
   void updateMemoryUsageStats(long estimatedSizeInBytes, long estimatedSizeInBytes2);
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e3aa7eac/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
new file mode 100644
index 0000000..b4d9c83
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class DurabilityIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.useMiniDFS(true);
+    cfg.setNumTservers(1);
+  }
+  
+  static final long N = 100000;
+  
+  String tableNames[] = null;
+  
+  void init() throws Exception {
+    synchronized (this) {
+      if (tableNames == null) {
+        tableNames = getUniqueNames(4);
+        Connector c = getConnector();
+        TableOperations tableOps = c.tableOperations();
+        tableOps.create(tableNames[0]); 
+        tableOps.create(tableNames[1]);
+        tableOps.create(tableNames[2]);
+        tableOps.create(tableNames[3]);
+        // default is sync
+        tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
+        tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
+        tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
+        // zookeeper propagation
+        UtilWaitThread.sleep(2 * 1000);
+      }
+    }
+  }
+
+  @Test(timeout = 2 * 60 * 1000)
+  public void testWriteSpeed() throws Exception {
+    init();
+    // write some gunk
+    long t0 = writeSome(tableNames[0], N); flush(tableNames[0]);
+    long t1 = writeSome(tableNames[1], N); flush(tableNames[1]);
+    long t2 = writeSome(tableNames[2], N); flush(tableNames[2]);
+    long t3 = writeSome(tableNames[3], N); flush(tableNames[3]);
+    System.out.println(String.format("t0 %d t1 %d t2 %d t3 %d", t0, t1, t2, t3));
+    assertTrue(t0 > t1);
+    assertTrue(t1 > t2);
+    assertTrue(t2 > t3);
+  }
+  
+  @Test(timeout = 4 * 60 * 1000)
+  public void testSync() throws Exception {
+    init();
+    // sync table should lose nothing
+    getConnector().tableOperations().deleteRows(tableNames[0], null, null);
+    writeSome(tableNames[0], N);
+    restartTServer();
+    assertEquals(N, readSome(tableNames[0], N));
+  }
+
+  @Test(timeout = 4 * 60 * 1000)
+  public void testFlush() throws Exception {
+    init();
+    // flush table won't lose anything since we're not losing power/dfs
+    getConnector().tableOperations().deleteRows(tableNames[1], null, null); 
+    writeSome(tableNames[1], N);
+    restartTServer();
+    assertEquals(N, readSome(tableNames[1], N));
+  }
+
+  @Test(timeout = 4 * 60 * 1000)
+  public void testLog() throws Exception {
+    init();
+    // we're probably going to lose something the the log setting
+    getConnector().tableOperations().deleteRows(tableNames[2], null, null); 
+    writeSome(tableNames[2], N);
+    restartTServer();
+    assertTrue(N > readSome(tableNames[2], N));
+  }
+  
+  @Test(timeout = 4 * 60 * 1000)
+  public void testNone() throws Exception {
+    init();
+    // probably won't get any data back without logging
+    getConnector().tableOperations().deleteRows(tableNames[3], null, null);
+    writeSome(tableNames[3], N);
+    restartTServer();
+    assertTrue(N > readSome(tableNames[3], N));
+  }
+
+  private long readSome(String table, long n) throws Exception {
+    long count = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : getConnector().createScanner(table,
Authorizations.EMPTY)) {
+      count++;
+    }
+    return count;
+  }
+
+  private void restartTServer() throws Exception {
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    cluster.start();
+  }
+
+  private void flush(String table) throws Exception {
+    getConnector().tableOperations().flush(table, null, null, true);
+  }
+
+  private long writeSome(String table, long count) throws Exception {
+    long now = System.currentTimeMillis();
+    Connector c = getConnector();
+    BatchWriter bw = c.createBatchWriter(table, null);
+    for (int i = 1; i < count + 1; i++) {
+      String data = "" + i;
+      Mutation m = new Mutation("" + i);
+      m.put(data, data, data);
+      bw.addMutation(m);
+      if (i % (count/100) == 0) {
+        bw.flush();
+      }
+    }
+    bw.close();
+    long result = System.currentTimeMillis() - now;
+    c.tableOperations().flush(table, null, null, true);
+    return result;
+  }
+  
+}


Mime
View raw message