accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [12/18] ACCUMULO-1957 added Durability options to the Proxy and Shell, swap test and proxy module dependencies
Date Fri, 05 Sep 2014 21:17:28 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/proxy/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
----------------------------------------------------------------------
diff --git a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java b/proxy/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
deleted file mode 100644
index e0b17ac..0000000
--- a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxySecurityOperations.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.proxy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.proxy.thrift.SystemPermission;
-import org.apache.accumulo.proxy.thrift.TablePermission;
-import org.apache.accumulo.proxy.thrift.TimeType;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.server.TServer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestProxySecurityOperations {
-  protected static TServer proxy;
-  protected static Thread thread;
-  protected static TestProxyClient tpc;
-  protected static ByteBuffer userpass;
-  protected static final int port = 10196;
-  protected static final String testtable = "testtable";
-  protected static final String testuser = "VonJines";
-  protected static final ByteBuffer testpw = ByteBuffer.wrap("fiveones".getBytes());
-  
-  @BeforeClass
-  public static void setup() throws Exception {
-    Properties prop = new Properties();
-    prop.setProperty("useMockInstance", "true");
-    prop.put("tokenClass", PasswordToken.class.getName());
-    
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
-    thread = new Thread() {
-      @Override
-      public void run() {
-        proxy.serve();
-      }
-    };
-    thread.start();
-    
-    tpc = new TestProxyClient("localhost", port);
-    userpass = tpc.proxy().login("root", new TreeMap<String,String>() {
-      private static final long serialVersionUID = 1L;
-      
-      {
-        put("password", "");
-      }
-    });
-  }
-  
-  @AfterClass
-  public static void tearDown() throws InterruptedException {
-    proxy.stop();
-    thread.join();
-  }
-  
-  @Before
-  public void makeTestTableAndUser() throws Exception {
-    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
-    tpc.proxy().createLocalUser(userpass, testuser, testpw);
-  }
-  
-  @After
-  public void deleteTestTable() throws Exception {
-    tpc.proxy().deleteTable(userpass, testtable);
-    tpc.proxy().dropLocalUser(userpass, testuser);
-  }
-  
-  @Test
-  public void create() throws TException {
-    tpc.proxy().createLocalUser(userpass, testuser + "2", testpw);
-    assertTrue(tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
-    tpc.proxy().dropLocalUser(userpass, testuser + "2");
-    assertTrue(!tpc.proxy().listLocalUsers(userpass).contains(testuser + "2"));
-  }
-  
-  @Test
-  public void authenticate() throws TException {
-    assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw)));
-    assertFalse(tpc.proxy().authenticateUser(userpass, "EvilUser", bb2pp(testpw)));
-    
-    tpc.proxy().changeLocalUserPassword(userpass, testuser, ByteBuffer.wrap("newpass".getBytes()));
-    assertFalse(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(testpw)));
-    assertTrue(tpc.proxy().authenticateUser(userpass, testuser, bb2pp(ByteBuffer.wrap("newpass".getBytes()))));
-    
-  }
-  
-  @Test
-  public void tablePermissions() throws TException {
-    tpc.proxy().grantTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE);
-    assertTrue(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE));
-    
-    tpc.proxy().revokeTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE);
-    assertFalse(tpc.proxy().hasTablePermission(userpass, testuser, testtable, TablePermission.ALTER_TABLE));
-    
-  }
-  
-  @Test
-  public void systemPermissions() throws TException {
-    tpc.proxy().grantSystemPermission(userpass, testuser, SystemPermission.ALTER_USER);
-    assertTrue(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER));
-    
-    tpc.proxy().revokeSystemPermission(userpass, testuser, SystemPermission.ALTER_USER);
-    assertFalse(tpc.proxy().hasSystemPermission(userpass, testuser, SystemPermission.ALTER_USER));
-    
-  }
-  
-  @Test
-  public void auths() throws TException {
-    HashSet<ByteBuffer> newauths = new HashSet<ByteBuffer>();
-    newauths.add(ByteBuffer.wrap("BBR".getBytes()));
-    newauths.add(ByteBuffer.wrap("Barney".getBytes()));
-    tpc.proxy().changeUserAuthorizations(userpass, testuser, newauths);
-    List<ByteBuffer> actualauths = tpc.proxy().getUserAuthorizations(userpass, testuser);
-    assertEquals(actualauths.size(), newauths.size());
-    
-    for (ByteBuffer auth : actualauths) {
-      assertTrue(newauths.contains(auth));
-    }
-  }
-  
-  private Map<String,String> bb2pp(ByteBuffer cf) {
-    Map<String,String> toRet = new TreeMap<String,String>();
-    toRet.put("password", ByteBufferUtil.toString(cf));
-    return toRet;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
----------------------------------------------------------------------
diff --git a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java b/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
deleted file mode 100644
index 87d3454..0000000
--- a/proxy/src/test/java/org/apache/accumulo/proxy/TestProxyTableOperations.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.proxy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.proxy.thrift.ColumnUpdate;
-import org.apache.accumulo.proxy.thrift.TimeType;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.server.TServer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestProxyTableOperations {
-  
-  protected static TServer proxy;
-  protected static Thread thread;
-  protected static TestProxyClient tpc;
-  protected static ByteBuffer userpass;
-  protected static final int port = 10195;
-  protected static final String testtable = "testtable";
-  
-  @SuppressWarnings("serial")
-  @BeforeClass
-  public static void setup() throws Exception {
-    Properties prop = new Properties();
-    prop.setProperty("useMockInstance", "true");
-    prop.put("tokenClass", PasswordToken.class.getName());
-    
-    proxy = Proxy.createProxyServer(Class.forName("org.apache.accumulo.proxy.thrift.AccumuloProxy"), Class.forName("org.apache.accumulo.proxy.ProxyServer"),
-        port, TCompactProtocol.Factory.class, prop);
-    thread = new Thread() {
-      @Override
-      public void run() {
-        proxy.serve();
-      }
-    };
-    thread.start();
-    tpc = new TestProxyClient("localhost", port);
-    userpass = tpc.proxy().login("root", new TreeMap<String,String>() {
-      {
-        put("password", "");
-      }
-    });
-  }
-  
-  @AfterClass
-  public static void tearDown() throws InterruptedException {
-    proxy.stop();
-    thread.join();
-  }
-  
-  @Before
-  public void makeTestTable() throws Exception {
-    tpc.proxy().createTable(userpass, testtable, true, TimeType.MILLIS);
-  }
-  
-  @After
-  public void deleteTestTable() throws Exception {
-    tpc.proxy().deleteTable(userpass, testtable);
-  }
-  
-  @Test
-  public void createExistsDelete() throws TException {
-    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
-    tpc.proxy().createTable(userpass, "testtable2", true, TimeType.MILLIS);
-    assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
-    tpc.proxy().deleteTable(userpass, "testtable2");
-    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
-  }
-  
-  @Test
-  public void listRename() throws TException {
-    assertFalse(tpc.proxy().tableExists(userpass, "testtable2"));
-    tpc.proxy().renameTable(userpass, testtable, "testtable2");
-    assertTrue(tpc.proxy().tableExists(userpass, "testtable2"));
-    tpc.proxy().renameTable(userpass, "testtable2", testtable);
-    assertTrue(tpc.proxy().listTables(userpass).contains("testtable"));
-    
-  }
-  
-  // This test does not yet function because the backing Mock instance does not yet support merging
-  @Test
-  public void merge() throws TException {
-    Set<ByteBuffer> splits = new HashSet<ByteBuffer>();
-    splits.add(ByteBuffer.wrap("a".getBytes()));
-    splits.add(ByteBuffer.wrap("c".getBytes()));
-    splits.add(ByteBuffer.wrap("z".getBytes()));
-    tpc.proxy().addSplits(userpass, testtable, splits);
-    
-    tpc.proxy().mergeTablets(userpass, testtable, ByteBuffer.wrap("b".getBytes()), ByteBuffer.wrap("d".getBytes()));
-    
-    splits.remove(ByteBuffer.wrap("c".getBytes()));
-    
-    List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10);
-    
-    for (ByteBuffer split : tableSplits)
-      assertTrue(splits.contains(split));
-    assertTrue(tableSplits.size() == splits.size());
-    
-  }
-  
-  @Test
-  public void splits() throws TException {
-    Set<ByteBuffer> splits = new HashSet<ByteBuffer>();
-    splits.add(ByteBuffer.wrap("a".getBytes()));
-    splits.add(ByteBuffer.wrap("b".getBytes()));
-    splits.add(ByteBuffer.wrap("z".getBytes()));
-    tpc.proxy().addSplits(userpass, testtable, splits);
-    
-    List<ByteBuffer> tableSplits = tpc.proxy().listSplits(userpass, testtable, 10);
-    
-    for (ByteBuffer split : tableSplits)
-      assertTrue(splits.contains(split));
-    assertTrue(tableSplits.size() == splits.size());
-  }
-  
-  @Test
-  public void constraints() throws TException {
-    int cid = tpc.proxy().addConstraint(userpass, testtable, "org.apache.accumulo.TestConstraint");
-    Map<String,Integer> constraints = tpc.proxy().listConstraints(userpass, testtable);
-    assertEquals((int) constraints.get("org.apache.accumulo.TestConstraint"), cid);
-    tpc.proxy().removeConstraint(userpass, testtable, cid);
-    constraints = tpc.proxy().listConstraints(userpass, testtable);
-    assertNull(constraints.get("org.apache.accumulo.TestConstraint"));
-  }
-  
-  @Test
-  public void localityGroups() throws TException {
-    Map<String,Set<String>> groups = new HashMap<String,Set<String>>();
-    Set<String> group1 = new HashSet<String>();
-    group1.add("cf1");
-    groups.put("group1", group1);
-    Set<String> group2 = new HashSet<String>();
-    group2.add("cf2");
-    group2.add("cf3");
-    groups.put("group2", group2);
-    tpc.proxy().setLocalityGroups(userpass, testtable, groups);
-    
-    Map<String,Set<String>> actualGroups = tpc.proxy().getLocalityGroups(userpass, testtable);
-    
-    assertEquals(groups.size(), actualGroups.size());
-    for (String groupName : groups.keySet()) {
-      assertTrue(actualGroups.containsKey(groupName));
-      assertEquals(groups.get(groupName).size(), actualGroups.get(groupName).size());
-      for (String cf : groups.get(groupName)) {
-        assertTrue(actualGroups.get(groupName).contains(cf));
-      }
-    }
-  }
-  
-  @Test
-  public void tableProperties() throws TException {
-    tpc.proxy().setTableProperty(userpass, testtable, "test.property1", "wharrrgarbl");
-    assertEquals(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1"), "wharrrgarbl");
-    tpc.proxy().removeTableProperty(userpass, testtable, "test.property1");
-    assertNull(tpc.proxy().getTableProperties(userpass, testtable).get("test.property1"));
-  }
-  
-  private static void addMutation(Map<ByteBuffer,List<ColumnUpdate>> mutations, String row, String cf, String cq, String value) {
-    ColumnUpdate update = new ColumnUpdate(ByteBuffer.wrap(cf.getBytes()), ByteBuffer.wrap(cq.getBytes()));
-    update.setValue(value.getBytes());
-    mutations.put(ByteBuffer.wrap(row.getBytes()), Collections.singletonList(update));
-  }
-  
-  @Test
-  public void tableOperationsRowMethods() throws TException {
-    Map<ByteBuffer,List<ColumnUpdate>> mutations = new HashMap<ByteBuffer,List<ColumnUpdate>>();
-    for (int i = 0; i < 10; i++) {
-      addMutation(mutations, "" + i, "cf", "cq", "");
-    }
-    tpc.proxy().updateAndFlush(userpass, testtable, mutations);
-    
-    assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("9".getBytes()));
-    
-    tpc.proxy().deleteRows(userpass, testtable, ByteBuffer.wrap("51".getBytes()), ByteBuffer.wrap("99".getBytes()));
-    assertEquals(tpc.proxy().getMaxRow(userpass, testtable, null, null, true, null, true), ByteBuffer.wrap("5".getBytes()));
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index a8c6afa..0db1e5d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -315,6 +315,41 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
             log.debug("Prepping table " + id + " for compaction cancellations.");
             zoo.putPersistentData(zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_COMPACT_CANCEL_ID, zero, NodeExistsPolicy.SKIP);
           }
+
+          @SuppressWarnings("deprecation")
+          String zpath = zooRoot + Constants.ZCONFIG + "/" + Property.TSERV_WAL_SYNC_METHOD.getKey();
+          boolean flushDefault = false;
+          try {
+            byte data[] = zoo.getData(zpath, null);
+            if (new String(data, StandardCharsets.UTF_8).endsWith("flush")) {
+              flushDefault = true;
+            }
+          } catch (KeeperException.NoNodeException ex) {
+            // skip
+          } 
+          for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
+            log.debug("Converting table " + id + " WALog setting to Durability");
+            try {
+              @SuppressWarnings("deprecation")
+              String path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/" + Property.TABLE_WALOG_ENABLED.getKey();
+              byte[] data = zoo.getData(path, null);
+              boolean useWAL = Boolean.parseBoolean(new String(data, StandardCharsets.UTF_8));
+              zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
+              path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/" + Property.TABLE_DURABILITY.getKey();
+              if (useWAL) {
+                if (flushDefault) {
+                  zoo.putPersistentData(path, "flush".getBytes(), NodeExistsPolicy.SKIP);
+                } else {
+                  zoo.putPersistentData(path, "sync".getBytes(), NodeExistsPolicy.SKIP);
+                }
+              } else {
+                zoo.putPersistentData(path, "none".getBytes(), NodeExistsPolicy.SKIP);
+              }
+            } catch (KeeperException.NoNodeException ex) {
+              // skip it
+            }
+          }
+        
         }
 
         // create initial namespaces

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
index f9c443a..9cc07dc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java
@@ -22,8 +22,8 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.client.Durability;
 
 public class TabletMutations {
-  private final int tid; 
-  private final int seq; 
+  private final int tid;
+  private final int seq;
   private final List<Mutation> mutations;
   private final Durability durability;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 8a9c510..f164621 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.CompressedIterators;
 import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.client.impl.DurabilityImpl;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletType;
@@ -349,6 +350,8 @@ public class TabletServer implements Runnable {
   private final WriteTracker writeTracker = new WriteTracker();
 
   private final RowLocks rowLocks = new RowLocks();
+  
+  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
 
   private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
 
@@ -655,7 +658,7 @@ public class TabletServer implements Runnable {
     @Override
     public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty) throws ThriftSecurityException {
       // Make sure user is real
-      Durability durability = Durability.fromThrift(tdurabilty);
+      Durability durability = DurabilityImpl.fromThrift(tdurabilty);
       security.authenticateUser(credentials, credentials);
       if (updateMetrics.isEnabled())
         updateMetrics.add(TabletServerUpdateMetrics.permissionErrors, 0);
@@ -726,14 +729,18 @@ public class TabletServer implements Runnable {
         setUpdateTablet(us, keyExtent);
 
         if (us.currentTablet != null) {
+          long additionalMutationSize = 0;
           List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
           for (TMutation tmutation : tmutations) {
             Mutation mutation = new ServerMutation(tmutation);
             mutations.add(mutation);
-            us.queuedMutationSize += mutation.numBytes();
+            additionalMutationSize += mutation.numBytes();
           }
-          if (us.queuedMutationSize > TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
+          us.queuedMutationSize += additionalMutationSize;
+          long totalQueued = TabletServer.this.updateTotalQueuedMutationSize(additionalMutationSize);
+          if (totalQueued > TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX)) {
             flush(us);
+          }
         }
       } finally {
         sessionManager.unreserveSession(us);
@@ -775,8 +782,8 @@ public class TabletServer implements Runnable {
                 }
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
-                log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability + " table durability " + tabletDurability + " using " + us.durability.resolveDurability(tabletDurability));
-                sendables.put(commitSession, new Mutations(us.durability.resolveDurability(tabletDurability), mutations));
+                log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability + " table durability " + tabletDurability + " using " + DurabilityImpl.resolveDurabilty(us.durability, tabletDurability));
+                sendables.put(commitSession, new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
                 mutationCount += mutations.size();
               }
 
@@ -791,7 +798,7 @@ public class TabletServer implements Runnable {
                 // violate constraints... this is what
                 // prepareMutationsForCommit()
                 // expects
-                sendables.put(e.getCommitSession(), new Mutations(us.durability.resolveDurability(tabletDurability), e.getNonViolators()));
+                sendables.put(e.getCommitSession(), new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), e.getNonViolators()));
               }
 
               mutationCount += mutations.size();
@@ -880,6 +887,7 @@ public class TabletServer implements Runnable {
         if (us.currentTablet != null) {
           us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
         }
+        TabletServer.this.updateTotalQueuedMutationSize(-us.queuedMutationSize);
         us.queuedMutationSize = 0;
       }
       us.totalUpdates += mutationCount;
@@ -977,7 +985,7 @@ public class TabletServer implements Runnable {
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation, Durability.fromThrift(tdurability).resolveDurability(tabletDurability));
+              logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl.resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability));
             } finally {
               wal.stop();
             }
@@ -1108,12 +1116,12 @@ public class TabletServer implements Runnable {
                 } else {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                  sendables.put(cs, new Mutations(sess.durability.resolveDurability(tabletDurability), mutations));
+                  sendables.put(cs, new Mutations(DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability), mutations));
                 }
               }
             } catch (TConstraintViolationException e) {
               if (e.getNonViolators().size() > 0) {
-                sendables.put(e.getCommitSession(), new Mutations(sess.durability.resolveDurability(tabletDurability), e.getNonViolators()));
+                sendables.put(e.getCommitSession(), new Mutations(DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability), e.getNonViolators()));
                 for (Mutation m : e.getNonViolators())
                   results.add(new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
               }
@@ -1214,7 +1222,7 @@ public class TabletServer implements Runnable {
         if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
           throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.BAD_AUTHORIZATIONS);
 
-      ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations), tableId, Durability.fromThrift(tdurabilty));
+      ConditionalSession cs = new ConditionalSession(credentials, new Authorizations(authorizations), tableId, DurabilityImpl.fromThrift(tdurabilty));
 
       long sid = sessionManager.createSession(cs, false);
       return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
@@ -1756,6 +1764,10 @@ public class TabletServer implements Runnable {
     return majorCompactorDisabled;
   }
 
+  public long updateTotalQueuedMutationSize(long additionalMutationSize) {
+    return totalQueuedMutationSize .addAndGet(additionalMutationSize);
+  }
+
   public Tablet getOnlineTablet(KeyExtent extent) {
     return onlineTablets.get(extent);
   }
@@ -2905,7 +2917,7 @@ public class TabletServer implements Runnable {
 
   public int createLogId(KeyExtent tablet) {
     AccumuloConfiguration acuTableConf = getTableConfiguration(tablet);
-    if (Durability.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) != Durability.NONE) {
+    if (DurabilityImpl.fromString(acuTableConf.get(Property.TABLE_DURABILITY)) != Durability.NONE) {
       return logIdGenerator.incrementAndGet();
     }
     return -1;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 2aa6380..25c0ee8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -156,15 +156,16 @@ public class TabletServerResourceManager {
     long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
     long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
     long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
+    long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
     _iCache = new LruBlockCache(iCacheSize, blockSize);
     _dCache = new LruBlockCache(dCacheSize, blockSize);
 
     Runtime runtime = Runtime.getRuntime();
-    if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) {
+    if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) {
       throw new IllegalArgumentException(String.format(
-          "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize,
-          runtime.maxMemory()));
+          "Maximum tablet server map memory %,d block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize,
+          totalQueueSize, runtime.maxMemory()));
     }
     runtime.gc();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index d374492..50475c2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -152,12 +152,9 @@ public class DfsLogger {
           switch (logWork.durability) {
             case DEFAULT:
             case NONE:
-              // shouldn't make it to the work queue
-              log.warn("unexpected durability " + logWork.durability, new Throwable());
-              break;
             case LOG:
-              // do nothing
-              break;
+              // shouldn't make it to the work queue
+              throw new IllegalArgumentException("unexpected durability " + logWork.durability);
             case SYNC:
               durabilityMethod = sync;
               break loop;
@@ -556,7 +553,17 @@ public class DfsLogger {
         durability = tabletMutations.getDurability();
       }
     }
-    return logFileData(data, durability);
+    return logFileData(data, chooseDurabilityForGroupCommit(mutations));
+  }
+
+  static Durability chooseDurabilityForGroupCommit(List<TabletMutations> mutations) {
+    Durability result = Durability.NONE;
+    for (TabletMutations tabletMutations : mutations) {
+      if (tabletMutations.getDurability().ordinal() > result.ordinal()) {
+        result = tabletMutations.getDurability();
+      }
+    }
+    return result;
   }
 
   public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index cb476c9..2a540e5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -361,10 +361,12 @@ public class TabletServerLogger {
   }
 
   public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m, final Durability durability) throws IOException {
-    if (durability == Durability.NONE)
+    if (durability == Durability.NONE) {
       return -1;
-    if (durability == Durability.DEFAULT)
-      log.warn("Unexpected durability " + durability, new Throwable());
+    }
+    if (durability == Durability.DEFAULT) {
+      throw new IllegalArgumentException("Unexpected durability " + durability);
+    }
     int seq = write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 965324a..c3380bf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.client.impl.DurabilityImpl;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -2384,6 +2385,7 @@ public class Tablet implements TabletCommitter {
 
   // this lock is basically used to synchronize writing of log info to metadata
   private final ReentrantLock logLock = new ReentrantLock();
+  private AtomicLong totalQueuedMutationSize = new AtomicLong(0);
 
   public synchronized int getLogCount() {
     return currentLogs.size();
@@ -2512,7 +2514,7 @@ public class Tablet implements TabletCommitter {
 
   @Override
   public Durability getDurability() {
-    return Durability.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
+    return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
new file mode 100644
index 0000000..2ae37ed
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.TabletMutations;
+import org.junit.Test;
+
+public class DfsLoggerTest {
+  
+  @Test
+  public void testDurabilityForGroupCommit() {
+    List<TabletMutations> lst = new ArrayList<TabletMutations>();
+    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m1 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.NONE);
+    lst.add(m1);
+    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m2 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.LOG);
+    lst.add(m2);
+    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m3 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.NONE);
+    lst.add(m3);
+    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m4 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.FLUSH);
+    lst.add(m4);
+    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m5 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.LOG);
+    lst.add(m5);
+    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m6 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.SYNC);
+    lst.add(m6);
+    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    TabletMutations m7 = new TabletMutations(0, 1, Collections.<Mutation>emptyList(), Durability.FLUSH);
+    lst.add(m7);
+    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/shell/src/main/java/org/apache/accumulo/shell/commands/InsertCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/InsertCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/InsertCommand.java
index 19ae5b8..5fae7ec 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/InsertCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/InsertCommand.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
@@ -46,6 +47,7 @@ import org.apache.hadoop.io.Text;
 public class InsertCommand extends Command {
   private Option insertOptAuths, timestampOpt;
   private Option timeoutOption;
+  private Option durabilityOption;
   
   protected long getTimeout(final CommandLine cl) {
     if (cl.hasOption(timeoutOption.getLongOpt())) {
@@ -78,8 +80,27 @@ public class InsertCommand extends Command {
     else
       m.put(colf, colq, val);
     
-    final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(),
-        new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
+    final BatchWriterConfig cfg = new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
+    if (cl.hasOption(durabilityOption.getOpt())) {
+      String userDurability = cl.getOptionValue(durabilityOption.getOpt());
+      switch (userDurability) {
+        case "sync":
+          cfg.setDurability(Durability.SYNC); 
+          break;
+        case "flush":
+          cfg.setDurability(Durability.FLUSH);
+          break;
+        case "none":
+          cfg.setDurability(Durability.NONE);
+          break;
+        case "log":
+          cfg.setDurability(Durability.NONE);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown durability: " + userDurability);
+      }
+    }
+    final BatchWriter bw = shellState.getConnector().createBatchWriter(shellState.getTableName(), cfg);
     bw.addMutation(m);
     try {
       bw.close();
@@ -138,6 +159,8 @@ public class InsertCommand extends Command {
     timeoutOption.setArgName("timeout");
     o.addOption(timeoutOption);
     
+    durabilityOption = new Option("d", "durability", true, "durability to use for insert, should be one of \"none\" \"log\" \"flush\" or \"sync\"");
+    
     return o;
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 80e8bee..fd6988c 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -100,6 +100,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-proxy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
index 51880b5..b563ed9 100644
--- a/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/WrongTabletTest.java
@@ -30,20 +30,20 @@ import org.apache.hadoop.io.Text;
 import com.beust.jcommander.Parameter;
 
 public class WrongTabletTest {
-  
+
   static class Opts extends ClientOpts {
     @Parameter(names = "--location", required = true)
     String location;
   }
-  
+
   public static void main(String[] args) {
     Opts opts = new Opts();
     opts.parseArgs(WrongTabletTest.class.getName(), args);
-    
+
     ServerConfigurationFactory conf = new ServerConfigurationFactory(opts.getInstance());
     try {
       TabletClientService.Iface client = ThriftUtil.getTServerClient(opts.location, conf.getConfiguration());
-      
+
       Mutation mutation = new Mutation(new Text("row_0003750001"));
       mutation.putDelete(new Text("colf"), new Text("colq"));
       client.update(Tracer.traceInfo(), new Credentials(opts.principal, opts.getToken()).toThrift(opts.getInstance()), new KeyExtent(new Text("!!"), null,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index ee8a80d..8ab1e98 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -80,152 +80,152 @@ import com.google.common.net.HostAndPort;
 /**
  * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the metadata location entries for a table to
  * point to it. This allows thrift performance to be measured by running any client code that writes to a table.
- * 
+ *
  */
 
 public class NullTserver {
-  
+
   public static class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface {
-    
+
     private long updateSession = 1;
-    
+
     public ThriftClientHandler(Instance instance, TransactionWatcher watcher) {
       super(instance, watcher, null);
     }
-    
+
     @Override
     public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability durability) {
       return updateSession++;
     }
-    
+
     @Override
     public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent keyExtent, List<TMutation> mutation) {}
-    
+
     @Override
     public UpdateErrors closeUpdate(TInfo tinfo, long updateID) {
       return new UpdateErrors(new HashMap<TKeyExtent,Long>(), new ArrayList<TConstraintViolationSummary>(), new HashMap<TKeyExtent,SecurityErrorCode>());
     }
-    
+
     @Override
     public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, long tid, Map<TKeyExtent,Map<String,MapFileInfo>> files, boolean setTime) {
       return null;
     }
-    
+
     @Override
     public void closeMultiScan(TInfo tinfo, long scanID) {}
-    
+
     @Override
     public void closeScan(TInfo tinfo, long scanID) {}
-    
+
     @Override
     public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) {
       return null;
     }
-    
+
     @Override
     public ScanResult continueScan(TInfo tinfo, long scanID) {
       return null;
     }
-    
+
     @Override
     public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent extent, ByteBuffer splitPoint) {
-      
+
     }
-    
+
     @Override
     public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> batch, List<TColumn> columns,
         List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) {
       return null;
     }
-    
+
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize,
         List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) {
       return null;
     }
-    
+
     @Override
     public void update(TInfo tinfo, TCredentials credentials, TKeyExtent keyExtent, TMutation mutation, TDurability durability) {
-      
+
     }
-    
+
     @Override
     public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return null;
     }
-    
+
     @Override
     public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId) throws ThriftSecurityException, TException {
       return null;
     }
-    
+
     @Override
     public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return null;
     }
-    
+
     @Override
     public void halt(TInfo tinfo, TCredentials credentials, String lock) throws ThriftSecurityException, TException {}
-    
+
     @Override
     public void fastHalt(TInfo tinfo, TCredentials credentials, String lock) {}
-    
+
     @Override
     public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
-    
+
     @Override
     public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {}
-    
+
     @Override
     public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveScan>();
     }
-    
+
     @Override
     public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {}
-    
+
     @Override
     public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {
-      
+
     }
-    
+
     @Override
     public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
-      
+
     }
-    
+
     @Override
     public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId, ByteBuffer startRow, ByteBuffer endRow) throws TException {
-      
+
     }
-    
+
     @Override
     public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {}
-    
+
     @Override
     public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {
       return new ArrayList<ActiveCompaction>();
     }
-    
+
     @Override
     public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials, List<ByteBuffer> authorizations, String tableID, TDurability durability)
         throws ThriftSecurityException, TException {
       return null;
     }
-    
+
     @Override
     public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID, Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
         throws NoSuchScanIDException, TException {
       return null;
     }
-    
+
     @Override
     public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
-    
+
     @Override
     public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {}
   }
-  
+
   static class Opts extends Help {
     @Parameter(names = {"-i", "--instance"}, description = "instance name", required = true)
     String iname = null;
@@ -236,22 +236,22 @@ public class NullTserver {
     @Parameter(names = "--port", description = "port number to use")
     int port = DefaultConfiguration.getInstance().getPort(Property.TSERV_CLIENTPORT);
   }
-  
+
   public static void main(String[] args) throws Exception {
     Opts opts = new Opts();
     opts.parseArgs(NullTserver.class.getName(), args);
-    
+
     TransactionWatcher watcher = new TransactionWatcher();
     ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher);
     Processor<Iface> processor = new Processor<Iface>(tch);
     TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, -1);
-    
+
     HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
-    
+
     // modify metadata
     ZooKeeperInstance zki = new ZooKeeperInstance(new ClientConfiguration().withInstance(opts.iname).withZkHosts(opts.keepers));
     String tableId = Tables.getTableId(zki, opts.tableName);
-    
+
     // read the locations for the table
     Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
     MetaDataTableScanner s = new MetaDataTableScanner(zki, SystemCredentials.get(), tableRange);
@@ -266,7 +266,7 @@ public class NullTserver {
     // point them to this server
     MetaDataStateStore store = new MetaDataStateStore();
     store.setLocations(assignments);
-    
+
     while (true) {
       UtilWaitThread.sleep(10000);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
index 4af85a7..8d14574 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
@@ -72,7 +72,7 @@ public class Config extends Test {
       s(Property.TSERV_MAXMEM, 1000000, 3 * 1024 * 1024 * 1024L),
       s(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 1, 25),
       s(Property.TSERV_MIGRATE_MAXCONCURRENT, 1, 10),
-      s(Property.TSERV_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
+      s(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
       s(Property.TSERV_RECOVERY_MAX_CONCURRENT, 1, 100),
       s(Property.TSERV_SCAN_MAX_OPENFILES, 10, 1000),
       s(Property.TSERV_THREADCHECK, 100, 10000),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a0beab0/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
new file mode 100644
index 0000000..f8bcbfb
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.proxy.thrift.AccumuloProxy.Client;
+import org.apache.accumulo.proxy.thrift.Column;
+import org.apache.accumulo.proxy.thrift.ColumnUpdate;
+import org.apache.accumulo.proxy.thrift.Condition;
+import org.apache.accumulo.proxy.thrift.ConditionalStatus;
+import org.apache.accumulo.proxy.thrift.ConditionalUpdates;
+import org.apache.accumulo.proxy.thrift.ConditionalWriterOptions;
+import org.apache.accumulo.proxy.thrift.Durability;
+import org.apache.accumulo.proxy.thrift.TimeType;
+import org.apache.accumulo.proxy.thrift.WriterOptions;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.thrift.protocol.TJSONProtocol.Factory;
+import org.apache.thrift.server.TServer;
+import org.junit.Test;
+
+public class ProxyDurabilityIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setNumTservers(1);
+  }
+  
+  private static ByteBuffer bytes(String value) {
+    return ByteBuffer.wrap(value.getBytes());
+  }
+  
+  @Test
+  public void testDurability() throws Exception {
+    Connector c = getConnector();
+    Properties props = new Properties();
+    props.put("instance", c.getInstance().getInstanceName());
+    props.put("zookeepers", c.getInstance().getZooKeepers());
+    props.put("tokenClass", PasswordToken.class.getName());
+
+    Class<Factory> protocolClass = org.apache.thrift.protocol.TJSONProtocol.Factory.class;
+
+    int proxyPort = PortUtils.getRandomFreePort();
+    final TServer proxyServer = Proxy.createProxyServer(org.apache.accumulo.proxy.thrift.AccumuloProxy.class, org.apache.accumulo.proxy.ProxyServer.class, proxyPort,
+        protocolClass, props);
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        proxyServer.serve();
+      }
+    };
+    thread.start();
+    while (!proxyServer.isServing())
+      UtilWaitThread.sleep(100);
+    Client client = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
+    Map<String,String> properties = new TreeMap<String,String>();
+    properties.put("password", ROOT_PASSWORD);
+    ByteBuffer login = client.login("root", properties);
+    
+    String tableName = getUniqueNames(1)[0];
+    client.createTable(login, tableName, true, TimeType.MILLIS);
+    assertTrue(c.tableOperations().exists(tableName));
+    
+    WriterOptions options = new WriterOptions();
+    options.setDurability(Durability.NONE);
+    String writer = client.createWriter(login, tableName, options);
+    Map<ByteBuffer,List<ColumnUpdate>> cells = new TreeMap<ByteBuffer, List<ColumnUpdate>>();
+    ColumnUpdate column = new ColumnUpdate(bytes("cf"), bytes("cq"));
+    column.setValue("value".getBytes());
+    cells.put(bytes("row"), Collections.singletonList(column));
+    client.update(writer, cells);
+    client.closeWriter(writer);
+    assertEquals(1, count(tableName));
+    restartTServer();
+    assertEquals(0, count(tableName));
+    
+    ConditionalWriterOptions cfg = new ConditionalWriterOptions();
+    cfg.setDurability(Durability.LOG);
+    String cwriter = client.createConditionalWriter(login, tableName, cfg);
+    ConditionalUpdates updates = new ConditionalUpdates();
+    updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes(""))));
+    updates.addToUpdates(column);
+    Map<ByteBuffer,ConditionalStatus> status = client.updateRowsConditionally(cwriter, Collections.singletonMap(bytes("row"), updates));
+    assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row")));
+    assertEquals(1, count(tableName));
+    restartTServer();
+    assertEquals(0, count(tableName));
+    
+    proxyServer.stop();
+    thread.join();
+  }
+
+  private void restartTServer() throws Exception {
+    for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+      cluster.killProcess(ServerType.TABLET_SERVER, proc);
+    }
+    cluster.start();
+  }
+
+  private int count(String tableName) throws Exception {
+    Connector c = getConnector();
+    return FunctionalTestUtils.count(c.createScanner(tableName, Authorizations.EMPTY));
+  }
+  
+}


Mime
View raw message