accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [01/18] ACCUMULO-1957 implement per-session Durability
Date Fri, 05 Sep 2014 21:17:17 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 17f625034 -> 6e02b3c7b


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index edd12aa..f9c443a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.tserver;
 import java.util.List;
 
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.tserver.tablet.Durability;
+import org.apache.accumulo.core.client.Durability;
 
 public class TabletMutations {
   private final int tid; 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 63bf4a3..57de347 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -56,6 +56,7 @@ import javax.management.StandardMBean;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.CompressedIterators;
 import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
@@ -115,6 +116,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -215,7 +217,6 @@ import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.accumulo.tserver.tablet.CompactionInfo;
 import org.apache.accumulo.tserver.tablet.CompactionWatcher;
 import org.apache.accumulo.tserver.tablet.Compactor;
-import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.accumulo.tserver.tablet.KVEntry;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
@@ -652,14 +653,14 @@ public class TabletServer implements Runnable {
     }
 
     @Override
-    public long startUpdate(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException
{
+    public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
throws ThriftSecurityException {
       // Make sure user is real
-
+      Durability durability = Durability.fromThrift(tdurabilty);
       security.authenticateUser(credentials, credentials);
       if (updateMetrics.isEnabled())
         updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
 
-      UpdateSession us = new UpdateSession(new TservConstraintEnv(security, credentials),
credentials);
+      UpdateSession us = new UpdateSession(new TservConstraintEnv(security, credentials),
credentials, durability);
       long sid = sessionManager.createSession(us, false);
       return sid;
     }
@@ -738,11 +739,11 @@ public class TabletServer implements Runnable {
         sessionManager.unreserveSession(us);
       }
     }
-
+    
     private void flush(UpdateSession us) {
 
       int mutationCount = 0;
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+      Map<CommitSession,Mutations> sendables = new HashMap<CommitSession,Mutations>();
       Throwable error = null;
 
       long pt1 = System.currentTimeMillis();
@@ -760,6 +761,7 @@ public class TabletServer implements Runnable {
         for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet())
{
 
           Tablet tablet = entry.getKey();
+          Durability tabletDurability = tablet.getDurability();
           List<Mutation> mutations = entry.getValue();
           if (mutations.size() > 0) {
             try {
@@ -773,7 +775,8 @@ public class TabletServer implements Runnable {
                 }
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
-                sendables.put(commitSession, mutations);
+                log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability
+ " table durability " + tabletDurability + " using " + us.durability.resolveDurability(tabletDurability));
+                sendables.put(commitSession, new Mutations(us.durability.resolveDurability(tabletDurability),
mutations));
                 mutationCount += mutations.size();
               }
 
@@ -788,7 +791,7 @@ public class TabletServer implements Runnable {
                 // violate constraints... this is what
                 // prepareMutationsForCommit()
                 // expects
-                sendables.put(e.getCommitSession(), e.getNonViolators());
+                sendables.put(e.getCommitSession(), new Mutations(us.durability.resolveDurability(tabletDurability),
e.getNonViolators()));
               }
 
               mutationCount += mutations.size();
@@ -813,8 +816,8 @@ public class TabletServer implements Runnable {
       updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
 
       if (error != null) {
-        for (Entry<CommitSession,List<Mutation>> e : sendables.entrySet()) {
-          e.getKey().abortCommit(e.getValue());
+        for (Entry<CommitSession,Mutations> e : sendables.entrySet()) {
+          e.getKey().abortCommit(e.getValue().getMutations());
         }
         throw new RuntimeException(error);
       }
@@ -847,9 +850,9 @@ public class TabletServer implements Runnable {
         Span commit = Trace.start("commit");
         try {
           long t1 = System.currentTimeMillis();
-          for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet())
{
+          for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
             CommitSession commitSession = entry.getKey();
-            List<Mutation> mutations = entry.getValue();
+            List<Mutation> mutations = entry.getValue().getMutations();
 
             commitSession.commit(mutations);
 
@@ -937,7 +940,7 @@ public class TabletServer implements Runnable {
     }
 
     @Override
-    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation
tmutation) throws NotServingTabletException,
+    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent, TMutation
tmutation, TDurability tdurability) throws NotServingTabletException,
         ConstraintViolationException, ThriftSecurityException {
 
       final String tableId = new String(tkeyExtent.getTable(), StandardCharsets.UTF_8);
@@ -948,6 +951,7 @@ public class TabletServer implements Runnable {
       if (tablet == null) {
         throw new NotServingTabletException(tkeyExtent);
       }
+      Durability tabletDurability = tablet.getDurability();
 
       if (!keyExtent.isMeta())
         TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
@@ -973,7 +977,7 @@ public class TabletServer implements Runnable {
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation);
+              logger.log(cs, cs.getWALogSeq(), mutation, Durability.fromThrift(tdurability).resolveDurability(tabletDurability));
             } finally {
               wal.stop();
             }
@@ -1076,7 +1080,7 @@ public class TabletServer implements Runnable {
     private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>>
updates, ArrayList<TCMResult> results, ConditionalSession sess) {
       Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
 
-      Map<CommitSession,List<Mutation>> sendables = new HashMap<CommitSession,List<Mutation>>();
+      Map<CommitSession,Mutations> sendables = new HashMap<CommitSession,Mutations>();
 
       boolean sessionCanceled = sess.interruptFlag.get();
 
@@ -1085,6 +1089,7 @@ public class TabletServer implements Runnable {
         long t1 = System.currentTimeMillis();
         for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
           Tablet tablet = onlineTablets.get(entry.getKey());
+          Durability tabletDurability = tablet.getDurability();
           if (tablet == null || tablet.isClosed() || sessionCanceled) {
             for (ServerConditionalMutation scm : entry.getValue())
               results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
@@ -1096,19 +1101,19 @@ public class TabletServer implements Runnable {
               if (mutations.size() > 0) {
 
                 CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security,
sess.credentials), mutations);
-
+                
                 if (cs == null) {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
                 } else {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                  sendables.put(cs, mutations);
+                  sendables.put(cs, new Mutations(sess.durability.resolveDurability(tabletDurability),
mutations));
                 }
               }
             } catch (TConstraintViolationException e) {
               if (e.getNonViolators().size() > 0) {
-                sendables.put(e.getCommitSession(), e.getNonViolators());
+                sendables.put(e.getCommitSession(), new Mutations(sess.durability.resolveDurability(tabletDurability),
e.getNonViolators()));
                 for (Mutation m : e.getNonViolators())
                   results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
               }
@@ -1150,9 +1155,9 @@ public class TabletServer implements Runnable {
       Span commitSpan = Trace.start("commit");
       try {
         long t1 = System.currentTimeMillis();
-        for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet())
{
+        for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
           CommitSession commitSession = entry.getKey();
-          List<Mutation> mutations = entry.getValue();
+          List<Mutation> mutations = entry.getValue().getMutations();
 
           commitSession.commit(mutations);
         }
@@ -1197,7 +1202,7 @@ public class TabletServer implements Runnable {
     }
 
     @Override
-    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations, String tableId)
+    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations, String tableId, TDurability tdurabilty)
         throws ThriftSecurityException, TException {
 
       Authorizations userauths = null;
@@ -1209,7 +1214,7 @@ public class TabletServer implements Runnable {
         if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
           throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 
-      ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations),
tableId);
+      ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations),
tableId, Durability.fromThrift(tdurabilty));
 
       long sid = sessionManager.createSession(cs, false);
       return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 443ba2e..f6829ea 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -26,7 +26,6 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.channels.ClosedChannelException;
 import java.nio.charset.StandardCharsets;
@@ -41,6 +40,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -59,7 +59,6 @@ import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -147,30 +146,39 @@ public class DfsLogger {
         }
         workQueue.drainTo(work);
 
+        String durability = null;
         Method durabilityMethod = null;
         loop:
         for (LogWork logWork : work) {
           switch (logWork.durability) {
+            case DEFAULT:
             case NONE:
               // shouldn't make it to the work queue
+              log.warn("unexpected durability " + logWork.durability, new Throwable());
               break;
             case LOG:
               // do nothing
               break;
             case SYNC:
               durabilityMethod = sync;
+              durability = logWork.durability.toString();
               break loop;
             case FLUSH:
               if (durabilityMethod == null) {
                 durabilityMethod = flush;
+                durability = logWork.durability.toString();
               }
               break;
           }
         }
 
         try {
-          if (durabilityMethod != null)
+          if (durabilityMethod != null) {
+            log.debug("durability method " + durability);
             durabilityMethod.invoke(logFile);
+          } else {
+            log.debug("skipping flush/sync");
+          }
         } catch (Exception ex) {
           log.warn("Exception syncing " + ex);
           for (DfsLogger.LogWork logWork : work) {
@@ -493,25 +501,9 @@ public class DfsLogger {
     key.tablet = tablet;
     try {
       write(key, EMPTY);
-      sync.invoke(logFile);
     } catch (IllegalArgumentException e) {
       log.error("Signature of sync method changed. Accumulo is likely incompatible with this
version of Hadoop.");
       throw new RuntimeException(e);
-    } catch (IllegalAccessException e) {
-      log.error("Could not invoke sync method due to permission error.");
-      throw new RuntimeException(e);
-    } catch (InvocationTargetException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else if (cause instanceof RuntimeException) {
-        throw (RuntimeException) cause;
-      } else if (cause instanceof Error) {
-        throw (Error) cause;
-      } else {
-        // Cause is null, or some other checked exception that was added later.
-        throw new RuntimeException(e);
-      }
     }
   }
 
@@ -563,6 +555,7 @@ public class DfsLogger {
       LogFileValue value = new LogFileValue();
       value.mutations = tabletMutations.getMutations();
       data.add(new Pair<LogFileKey,LogFileValue>(key, value));
+      log.debug("Durability for " + tabletMutations.getDurability() + " (ordinal) " + tabletMutations.getDurability().ordinal()
+ " durability " + durability + " (ordinal) " + durability.ordinal());
       if (tabletMutations.getDurability().ordinal() > durability.ordinal())
         durability = tabletMutations.getDurability();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 56998d4..cb476c9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -43,11 +43,11 @@ import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
+import org.apache.accumulo.tserver.Mutations;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
 import org.apache.accumulo.tserver.tablet.CommitSession;
-import org.apache.accumulo.tserver.tablet.Durability;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -351,8 +351,6 @@ public class TabletServerLogger {
 
   public int defineTablet(final CommitSession commitSession) throws IOException {
     // scribble this into the metadata tablet, too.
-    if (!enabled(commitSession))
-      return -1;
     return write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
@@ -362,29 +360,29 @@ public class TabletServerLogger {
     });
   }
 
-  private boolean enabled(CommitSession commitSession) {
-    return commitSession.getDurabilty() != Durability.NONE;
-  }
-
-  public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m)
throws IOException {
-    if (!enabled(commitSession))
+  public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m,
final Durability durability) throws IOException {
+    if (durability == Durability.NONE)
       return -1;
+    if (durability == Durability.DEFAULT)
+      log.warn("Unexpected durability " + durability, new Throwable());
     int seq = write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        return logger.log(tabletSeq, commitSession.getLogId(), m, commitSession.getDurabilty());
+        return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
       }
     });
     logSizeEstimate.addAndGet(m.numBytes());
     return seq;
   }
 
-  public int logManyTablets(Map<CommitSession,List<Mutation>> mutations) throws
IOException {
+  public int logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException
{
 
-    final Map<CommitSession,List<Mutation>> loggables = new HashMap<CommitSession,List<Mutation>>(mutations);
-    for (CommitSession t : mutations.keySet()) {
-      if (!enabled(t))
-        loggables.remove(t);
+    final Map<CommitSession,Mutations> loggables = new HashMap<CommitSession,Mutations>(mutations);
+    for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
+      Durability durability = entry.getValue().getDurability();
+      if (durability == Durability.NONE) {
+        loggables.remove(entry.getKey());
+      }
     }
     if (loggables.size() == 0)
       return -1;
@@ -393,17 +391,18 @@ public class TabletServerLogger {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
-        for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet())
{
+        for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
-          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue(),
cs.getDurabilty()));
+          Durability durability = entry.getValue().getDurability();
+          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue().getMutations(),
durability));
         }
         return logger.logManyTablets(copy);
       }
     });
-    for (List<Mutation> entry : loggables.values()) {
-      if (entry.size() < 1)
+    for (Mutations entry : loggables.values()) {
+      if (entry.getMutations().size() < 1)
         throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
-      for (Mutation m : entry) {
+      for (Mutation m : entry.getMutations()) {
         logSizeEstimate.addAndGet(m.numBytes());
       }
     }
@@ -412,9 +411,6 @@ public class TabletServerLogger {
 
   public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName,
final int walogSeq) throws IOException {
 
-    if (!enabled(commitSession))
-      return;
-
     long t1 = System.currentTimeMillis();
 
     int seq = write(commitSession, true, new Writer() {
@@ -431,8 +427,6 @@ public class TabletServerLogger {
   }
 
   public int minorCompactionStarted(final CommitSession commitSession, final int seq, final
String fullyQualifiedFileName) throws IOException {
-    if (!enabled(commitSession))
-      return -1;
     write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
@@ -445,8 +439,6 @@ public class TabletServerLogger {
 
   public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path>
logs, Set<String> tabletFiles, MutationReceiver mr)
       throws IOException {
-    if (Durability.fromString(tconf.get(Property.TABLE_DURABILITY)) == Durability.NONE)
-      return;
     try {
       SortedLogRecovery recovery = new SortedLogRecovery(fs);
       recovery.recover(extent, logs, tabletFiles, mr);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
index 26668f6..d2515e6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ConditionalSession.java
@@ -20,18 +20,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.client.Durability;
 
 public class ConditionalSession extends Session {
   public final TCredentials credentials;
   public final Authorizations auths;
   public final String tableId;
   public final AtomicBoolean interruptFlag = new AtomicBoolean();
+  public final Durability durability;
   
-  public ConditionalSession(TCredentials credentials, Authorizations authorizations, String
tableId) {
+  public ConditionalSession(TCredentials credentials, Authorizations authorizations, String
tableId, Durability durability) {
     super(credentials);
     this.credentials = credentials;
     this.auths = authorizations;
     this.tableId = tableId;
+    this.durability = durability;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
index bc04a85..65430ce 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.tserver.TservConstraintEnv;
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.tserver.tablet.Tablet;
 
 public class UpdateSession extends Session {
@@ -46,11 +47,12 @@ public class UpdateSession extends Session {
   public long totalUpdates = 0;
   public long flushTime = 0;
   public long queuedMutationSize = 0;
+  public final Durability durability;
   
-  public UpdateSession(TservConstraintEnv env, TCredentials credentials) {
+  public UpdateSession(TservConstraintEnv env, TCredentials credentials, Durability durability)
{
     super(credentials);
     this.cenv = env;
     this.violations = new Violations();
+    this.durability = durability;
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
index b2d89c9..db4100f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -111,10 +111,6 @@ public class CommitSession {
     return maxCommittedTime;
   }
 
-  public Durability getDurabilty() {
-    return committer.getDurability();
-  }
-
   public void mutate(List<Mutation> mutations) {
     memTable.mutate(mutations);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
deleted file mode 100644
index a88e377..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Durability.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver.tablet;
-
-public enum Durability {
-  NONE,
-  LOG,
-  FLUSH,
-  SYNC;
-
-  static public Durability fromString(String value) {
-    try {
-      return Durability.valueOf(value.toUpperCase());
-    } catch (IllegalArgumentException ex) {
-      return Durability.SYNC;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index fdf072a..965324a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
index b6bb458..3042267 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.tablet;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.server.conf.TableConfiguration;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
index aeba2e0..51880b5 100644
--- a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.server.cli.ClientOpts;
@@ -46,7 +47,7 @@ public class WrongTabletTest {
       Mutation mutation = new Mutation(new Text("row_0003750001"));
       mutation.putDelete(new Text("colf"), new Text("colq"));
       client.update(Tracer.traceInfo(), new Credentials(opts.principal, opts.getToken()).toThrift(opts.getInstance()),
new KeyExtent(new Text("!!"), null,
-          new Text("row_0003750000")).toThrift(), mutation.toThrift());
+          new Text("row_0003750000")).toThrift(), mutation.toThrift(), TDurability.DEFAULT);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 6c34172..ee8a80d 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -93,7 +94,7 @@ public class NullTserver {
     }
     
     @Override
-    public long startUpdate(TInfo tinfo, TCredentials credentials) {
+    public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability durability)
{
       return updateSession++;
     }
     
@@ -144,7 +145,7 @@ public class NullTserver {
     }
     
     @Override
-    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation
mutation) {
+    public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation
mutation, TDurability durability) {
       
     }
     
@@ -207,7 +208,7 @@ public class NullTserver {
     }
     
     @Override
-    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations, String tableID)
+    public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations, String tableID, TDurability durability)
         throws ThriftSecurityException, TException {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
index 590c7c4..05e3bef 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
@@ -34,49 +34,58 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.minicluster.impl.ProcessReference;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.junit.Test;
 
 public class DurabilityIT extends ConfigurableMacIT {
 
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.useMiniDFS(true);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
     cfg.setNumTservers(1);
   }
 
   static final long N = 100000;
 
-  String tableNames[] = null;
-
-  void init() throws Exception {
-    synchronized (this) {
-      if (tableNames == null) {
-        tableNames = getUniqueNames(4);
-        Connector c = getConnector();
-        TableOperations tableOps = c.tableOperations();
-        tableOps.create(tableNames[0]);
-        tableOps.create(tableNames[1]);
-        tableOps.create(tableNames[2]);
-        tableOps.create(tableNames[3]);
-        // default is sync
-        tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
-        tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
-        tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
-        // zookeeper propagation
-        UtilWaitThread.sleep(2 * 1000);
-      }
+  private String[] init() throws Exception {
+    String[] tableNames = getUniqueNames(4);
+    Connector c = getConnector();
+    TableOperations tableOps = c.tableOperations();
+    createTable(tableNames[0]);
+    createTable(tableNames[1]);
+    createTable(tableNames[2]);
+    createTable(tableNames[3]);
+    // default is sync
+    tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
+    tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
+    tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
+    UtilWaitThread.sleep(1000);
+    return tableNames;
+  }
+  
+  private void cleanup(String[] tableNames) throws Exception {
+    Connector c = getConnector();
+    for (String tableName : tableNames) {
+      c.tableOperations().delete(tableName);
     }
   }
+  
+  private void createTable(String tableName) throws Exception {
+    TableOperations tableOps = getConnector().tableOperations();
+    tableOps.create(tableName);
+  }
 
   @Test(timeout = 2 * 60 * 1000)
   public void testWriteSpeed() throws Exception {
-    init();
-    // write some gunk
-    long t0 = writeSome(tableNames[0], N); flush(tableNames[0]);
-    long t1 = writeSome(tableNames[1], N); flush(tableNames[1]);
-    long t2 = writeSome(tableNames[2], N); flush(tableNames[2]);
-    long t3 = writeSome(tableNames[3], N); flush(tableNames[3]);
-    System.out.println(String.format("t0 %d t1 %d t2 %d t3 %d", t0, t1, t2, t3));
+    TableOperations tableOps = getConnector().tableOperations();
+    String tableNames[] = init();
+    // write some gunk, delete the table to keep that table from messing with the performance
numbers of successive calls
+    long t0 = writeSome(tableNames[0], N); tableOps.delete(tableNames[0]);
+    long t1 = writeSome(tableNames[1], N); tableOps.delete(tableNames[1]);
+    long t2 = writeSome(tableNames[2], N); tableOps.delete(tableNames[2]);
+    long t3 = writeSome(tableNames[3], N); tableOps.delete(tableNames[3]);
+    System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3));
     assertTrue(t0 > t1);
     assertTrue(t1 > t2);
     assertTrue(t2 > t3);
@@ -84,42 +93,42 @@ public class DurabilityIT extends ConfigurableMacIT {
 
   @Test(timeout = 4 * 60 * 1000)
   public void testSync() throws Exception {
-    init();
+    String tableNames[] = init();
     // sync table should lose nothing
-    getConnector().tableOperations().deleteRows(tableNames[0], null, null);
     writeSome(tableNames[0], N);
     restartTServer();
     assertEquals(N, readSome(tableNames[0], N));
+    cleanup(tableNames);
   }
 
   @Test(timeout = 4 * 60 * 1000)
   public void testFlush() throws Exception {
-    init();
+    String tableNames[] = init();
     // flush table won't lose anything since we're not losing power/dfs
-    getConnector().tableOperations().deleteRows(tableNames[1], null, null);
     writeSome(tableNames[1], N);
     restartTServer();
     assertEquals(N, readSome(tableNames[1], N));
+    cleanup(tableNames);
   }
 
   @Test(timeout = 4 * 60 * 1000)
   public void testLog() throws Exception {
-    init();
+    String tableNames[] = init();
     // we're probably going to lose something the the log setting
-    getConnector().tableOperations().deleteRows(tableNames[2], null, null);
     writeSome(tableNames[2], N);
     restartTServer();
-    assertTrue(N > readSome(tableNames[2], N));
+    assertTrue(N >= readSome(tableNames[2], N));
+    cleanup(tableNames);
   }
 
   @Test(timeout = 4 * 60 * 1000)
   public void testNone() throws Exception {
-    init();
+    String tableNames[] = init();
     // probably won't get any data back without logging
-    getConnector().tableOperations().deleteRows(tableNames[3], null, null);
     writeSome(tableNames[3], N);
     restartTServer();
     assertTrue(N > readSome(tableNames[3], N));
+    cleanup(tableNames);
   }
 
   private long readSome(String table, long n) throws Exception {
@@ -137,26 +146,21 @@ public class DurabilityIT extends ConfigurableMacIT {
     cluster.start();
   }
 
-  private void flush(String table) throws Exception {
-    getConnector().tableOperations().flush(table, null, null, true);
-  }
-
   private long writeSome(String table, long count) throws Exception {
     long now = System.currentTimeMillis();
     Connector c = getConnector();
     BatchWriter bw = c.createBatchWriter(table, null);
     for (int i = 1; i < count + 1; i++) {
-      String data = "" + i;
       Mutation m = new Mutation("" + i);
-      m.put(data, data, data);
+      m.put("", "", "");
       bw.addMutation(m);
-      if (i % (count/100) == 0) {
+      if (i % (Math.max(1, count/100)) == 0) {
         bw.flush();
       }
     }
     bw.close();
     long result = System.currentTimeMillis() - now;
-    c.tableOperations().flush(table, null, null, true);
+    //c.tableOperations().flush(table, null, null, true);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c2d95a1d/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
new file mode 100644
index 0000000..1f84327
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@ -0,0 +1,75 @@
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.*;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+public class SessionDurabilityIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+  }
+  
+  @Test
+  public void nondurableTableHasDurableWrites() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+    BatchWriterConfig cfg = new BatchWriterConfig();
+    cfg.setDurability(Durability.SYNC);
+    write(tableName, 10, cfg);
+    assertEquals(10, count(tableName));
+    restartTServer();
+    assertEquals(10, count(tableName));
+  }
+  
+  private int count(String tableName) throws Exception {
+    Connector c = getConnector();
+    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    int result = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry :scanner) {
+      result++;
+    }
+    return result;
+  }
+
+  private void write(String tableName, int n, BatchWriterConfig cfg) throws Exception {
+    Connector c = getConnector();
+    BatchWriter bw = c.createBatchWriter(tableName, cfg);
+    for (int i = 0; i < 10; i++) {
+      Mutation m = new Mutation(i + "");
+      m.put("", "", "");
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+  
+  private void restartTServer() throws Exception {
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    cluster.start();
+  }
+
+}


Mime
View raw message