accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-1950 use server-wide memory count
Date Mon, 08 Sep 2014 21:03:41 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 2533b7eff -> ef701cabb


ACCUMULO-1950 use server-wide memory count


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef701cab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef701cab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef701cab

Branch: refs/heads/master
Commit: ef701cabbd5e1fed993bbd13e86c40b7d72040d0
Parents: 2533b7e
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Sep 8 16:38:49 2014 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Sep 8 16:38:49 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   6 +-
 .../core/master/thrift/TabletServerStatus.java  | 198 ++++++++++++++++++-
 core/src/main/thrift/master.thrift              |   2 +
 .../apache/accumulo/tserver/TabletServer.java   |  30 ++-
 .../tserver/TabletServerResourceManager.java    |   2 +-
 .../apache/accumulo/tserver/log/DfsLogger.java  |  12 +-
 .../tserver/log/TabletServerLogger.java         |  15 +-
 .../test/randomwalk/concurrent/Config.java      |   2 +-
 .../org/apache/accumulo/test/TotalQueuedIT.java | 129 ++++++++++++
 .../accumulo/test/functional/BloomFilterIT.java |   2 +-
 10 files changed, 374 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5401c7c..35cd0a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -197,11 +197,15 @@ public enum Property {
   TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "512M", PropertyType.MEMORY, "Specifies
the size of the cache for file indices."),
   TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, "if the ports above
are in use, search higher ports until one is available"),
   TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling
client connections on the tablet servers"),
+  @Deprecated
   TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", PropertyType.MEMORY,
-          "The amount of memory to use to store write-ahead-log mutations-per-session before
flushing them. Since the buffer is per write session, consider the"
+      "This setting is deprecated. See tserver.total.mutation.queue.max. " 
+          + "The amount of memory to use to store write-ahead-log mutations-per-session before
flushing them. Since the buffer is per write session, consider the"
           + " max number of concurrent writer when configuring. When using Hadoop 2, Accumulo
will call hsync() on the WAL . For a small number of "
           + "concurrent writers, increasing this buffer size decreases the frequncy of hsync
calls. For a large number of concurrent writers a small buffers "
           + "size is ok because of group commit."),
+  TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY,

+      "The amount of memory used to store write-ahead-log mutations before flushing them."),
   TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "30",
PropertyType.COUNT,
       "To find a tablets split points, all index files are opened. This setting determines
how many index "
           + "files can be opened at once. When there are more index files than this setting
multiple passes "

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
index 6348537..e7e6e0e 100644
--- a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
+++ b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java
@@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField DATA_CACHE_HITS_FIELD_DESC = new
org.apache.thrift.protocol.TField("dataCacheHits", org.apache.thrift.protocol.TType.I64, (short)12);
   private static final org.apache.thrift.protocol.TField DATA_CACHE_REQUEST_FIELD_DESC =
new org.apache.thrift.protocol.TField("dataCacheRequest", org.apache.thrift.protocol.TType.I64,
(short)13);
   private static final org.apache.thrift.protocol.TField LOG_SORTS_FIELD_DESC = new org.apache.thrift.protocol.TField("logSorts",
org.apache.thrift.protocol.TType.LIST, (short)14);
+  private static final org.apache.thrift.protocol.TField FLUSHS_FIELD_DESC = new org.apache.thrift.protocol.TField("flushs",
org.apache.thrift.protocol.TType.I64, (short)15);
+  private static final org.apache.thrift.protocol.TField SYNCS_FIELD_DESC = new org.apache.thrift.protocol.TField("syncs",
org.apache.thrift.protocol.TType.I64, (short)16);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes =
new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -80,6 +82,8 @@ import org.slf4j.LoggerFactory;
   public long dataCacheHits; // required
   public long dataCacheRequest; // required
   public List<RecoveryStatus> logSorts; // required
+  public long flushs; // required
+  public long syncs; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
   @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum
{
@@ -93,7 +97,9 @@ import org.slf4j.LoggerFactory;
     INDEX_CACHE_REQUEST((short)11, "indexCacheRequest"),
     DATA_CACHE_HITS((short)12, "dataCacheHits"),
     DATA_CACHE_REQUEST((short)13, "dataCacheRequest"),
-    LOG_SORTS((short)14, "logSorts");
+    LOG_SORTS((short)14, "logSorts"),
+    FLUSHS((short)15, "flushs"),
+    SYNCS((short)16, "syncs");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -130,6 +136,10 @@ import org.slf4j.LoggerFactory;
           return DATA_CACHE_REQUEST;
         case 14: // LOG_SORTS
           return LOG_SORTS;
+        case 15: // FLUSHS
+          return FLUSHS;
+        case 16: // SYNCS
+          return SYNCS;
         default:
           return null;
       }
@@ -178,7 +188,9 @@ import org.slf4j.LoggerFactory;
   private static final int __INDEXCACHEREQUEST_ISSET_ID = 5;
   private static final int __DATACACHEHITS_ISSET_ID = 6;
   private static final int __DATACACHEREQUEST_ISSET_ID = 7;
-  private byte __isset_bitfield = 0;
+  private static final int __FLUSHS_ISSET_ID = 8;
+  private static final int __SYNCS_ISSET_ID = 9;
+  private short __isset_bitfield = 0;
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields,
org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -207,6 +219,10 @@ import org.slf4j.LoggerFactory;
     tmpMap.put(_Fields.LOG_SORTS, new org.apache.thrift.meta_data.FieldMetaData("logSorts",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,

             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
RecoveryStatus.class))));
+    tmpMap.put(_Fields.FLUSHS, new org.apache.thrift.meta_data.FieldMetaData("flushs", org.apache.thrift.TFieldRequirementType.DEFAULT,

+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.SYNCS, new org.apache.thrift.meta_data.FieldMetaData("syncs", org.apache.thrift.TFieldRequirementType.DEFAULT,

+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TabletServerStatus.class,
metaDataMap);
   }
@@ -225,7 +241,9 @@ import org.slf4j.LoggerFactory;
     long indexCacheRequest,
     long dataCacheHits,
     long dataCacheRequest,
-    List<RecoveryStatus> logSorts)
+    List<RecoveryStatus> logSorts,
+    long flushs,
+    long syncs)
   {
     this();
     this.tableMap = tableMap;
@@ -247,6 +265,10 @@ import org.slf4j.LoggerFactory;
     this.dataCacheRequest = dataCacheRequest;
     setDataCacheRequestIsSet(true);
     this.logSorts = logSorts;
+    this.flushs = flushs;
+    setFlushsIsSet(true);
+    this.syncs = syncs;
+    setSyncsIsSet(true);
   }
 
   /**
@@ -287,6 +309,8 @@ import org.slf4j.LoggerFactory;
       }
       this.logSorts = __this__logSorts;
     }
+    this.flushs = other.flushs;
+    this.syncs = other.syncs;
   }
 
   public TabletServerStatus deepCopy() {
@@ -314,6 +338,10 @@ import org.slf4j.LoggerFactory;
     setDataCacheRequestIsSet(false);
     this.dataCacheRequest = 0;
     this.logSorts = null;
+    setFlushsIsSet(false);
+    this.flushs = 0;
+    setSyncsIsSet(false);
+    this.syncs = 0;
   }
 
   public int getTableMapSize() {
@@ -598,6 +626,52 @@ import org.slf4j.LoggerFactory;
     }
   }
 
+  public long getFlushs() {
+    return this.flushs;
+  }
+
+  public TabletServerStatus setFlushs(long flushs) {
+    this.flushs = flushs;
+    setFlushsIsSet(true);
+    return this;
+  }
+
+  public void unsetFlushs() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FLUSHS_ISSET_ID);
+  }
+
+  /** Returns true if field flushs is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetFlushs() {
+    return EncodingUtils.testBit(__isset_bitfield, __FLUSHS_ISSET_ID);
+  }
+
+  public void setFlushsIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FLUSHS_ISSET_ID, value);
+  }
+
+  public long getSyncs() {
+    return this.syncs;
+  }
+
+  public TabletServerStatus setSyncs(long syncs) {
+    this.syncs = syncs;
+    setSyncsIsSet(true);
+    return this;
+  }
+
+  public void unsetSyncs() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SYNCS_ISSET_ID);
+  }
+
+  /** Returns true if field syncs is set (has been assigned a value) and false otherwise
*/
+  public boolean isSetSyncs() {
+    return EncodingUtils.testBit(__isset_bitfield, __SYNCS_ISSET_ID);
+  }
+
+  public void setSyncsIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SYNCS_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case TABLE_MAP:
@@ -688,6 +762,22 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case FLUSHS:
+      if (value == null) {
+        unsetFlushs();
+      } else {
+        setFlushs((Long)value);
+      }
+      break;
+
+    case SYNCS:
+      if (value == null) {
+        unsetSyncs();
+      } else {
+        setSyncs((Long)value);
+      }
+      break;
+
     }
   }
 
@@ -726,6 +816,12 @@ import org.slf4j.LoggerFactory;
     case LOG_SORTS:
       return getLogSorts();
 
+    case FLUSHS:
+      return Long.valueOf(getFlushs());
+
+    case SYNCS:
+      return Long.valueOf(getSyncs());
+
     }
     throw new IllegalStateException();
   }
@@ -759,6 +855,10 @@ import org.slf4j.LoggerFactory;
       return isSetDataCacheRequest();
     case LOG_SORTS:
       return isSetLogSorts();
+    case FLUSHS:
+      return isSetFlushs();
+    case SYNCS:
+      return isSetSyncs();
     }
     throw new IllegalStateException();
   }
@@ -875,6 +975,24 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_flushs = true;
+    boolean that_present_flushs = true;
+    if (this_present_flushs || that_present_flushs) {
+      if (!(this_present_flushs && that_present_flushs))
+        return false;
+      if (this.flushs != that.flushs)
+        return false;
+    }
+
+    boolean this_present_syncs = true;
+    boolean that_present_syncs = true;
+    if (this_present_syncs || that_present_syncs) {
+      if (!(this_present_syncs && that_present_syncs))
+        return false;
+      if (this.syncs != that.syncs)
+        return false;
+    }
+
     return true;
   }
 
@@ -1001,6 +1119,26 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetFlushs()).compareTo(other.isSetFlushs());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFlushs()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.flushs, other.flushs);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetSyncs()).compareTo(other.isSetSyncs());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetSyncs()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.syncs, other.syncs);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1076,6 +1214,14 @@ import org.slf4j.LoggerFactory;
       sb.append(this.logSorts);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("flushs:");
+    sb.append(this.flushs);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("syncs:");
+    sb.append(this.syncs);
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1233,6 +1379,22 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 15: // FLUSHS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.flushs = iprot.readI64();
+              struct.setFlushsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 16: // SYNCS
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.syncs = iprot.readI64();
+              struct.setSyncsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1302,6 +1464,12 @@ import org.slf4j.LoggerFactory;
         }
         oprot.writeFieldEnd();
       }
+      oprot.writeFieldBegin(FLUSHS_FIELD_DESC);
+      oprot.writeI64(struct.flushs);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(SYNCS_FIELD_DESC);
+      oprot.writeI64(struct.syncs);
+      oprot.writeFieldEnd();
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1353,7 +1521,13 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetLogSorts()) {
         optionals.set(10);
       }
-      oprot.writeBitSet(optionals, 11);
+      if (struct.isSetFlushs()) {
+        optionals.set(11);
+      }
+      if (struct.isSetSyncs()) {
+        optionals.set(12);
+      }
+      oprot.writeBitSet(optionals, 13);
       if (struct.isSetTableMap()) {
         {
           oprot.writeI32(struct.tableMap.size());
@@ -1400,12 +1574,18 @@ import org.slf4j.LoggerFactory;
           }
         }
       }
+      if (struct.isSetFlushs()) {
+        oprot.writeI64(struct.flushs);
+      }
+      if (struct.isSetSyncs()) {
+        oprot.writeI64(struct.syncs);
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, TabletServerStatus struct)
throws org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
-      BitSet incoming = iprot.readBitSet(11);
+      BitSet incoming = iprot.readBitSet(13);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TMap _map11 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING,
org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
@@ -1472,6 +1652,14 @@ import org.slf4j.LoggerFactory;
         }
         struct.setLogSortsIsSet(true);
       }
+      if (incoming.get(11)) {
+        struct.flushs = iprot.readI64();
+        struct.setFlushsIsSet(true);
+      }
+      if (incoming.get(12)) {
+        struct.syncs = iprot.readI64();
+        struct.setSyncsIsSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/thrift/master.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/master.thrift b/core/src/main/thrift/master.thrift
index 49fa262..72ba3a5 100644
--- a/core/src/main/thrift/master.thrift
+++ b/core/src/main/thrift/master.thrift
@@ -60,6 +60,8 @@ struct TabletServerStatus {
   12:i64 dataCacheHits
   13:i64 dataCacheRequest
   14:list<RecoveryStatus> logSorts
+  15:i64 flushs
+  16:i64 syncs
 }
 
 enum MasterState {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index dc9f27f..b4fbfed 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -263,6 +263,9 @@ public class TabletServer implements Runnable {
   private final TabletStatsKeeper statsKeeper;
   private final AtomicInteger logIdGenerator = new AtomicInteger();
 
+  private final AtomicLong flushCounter = new AtomicLong(0);
+  private final AtomicLong syncCounter = new AtomicLong(0);
+  
   private final VolumeManager fs;
   public Instance getInstance() {
     return serverConfig.getInstance();
@@ -333,7 +336,7 @@ public class TabletServer implements Runnable {
     if (minBlockSize != 0 && minBlockSize > walogMaxSize)
       throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize
" + walogMaxSize + " but hdfs minimum block size is "
           + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or
decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
-    logger = new TabletServerLogger(this, walogMaxSize);
+    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter);
     this.resourceManager = new TabletServerResourceManager(getInstance(), fs);
   }
 
@@ -351,6 +354,8 @@ public class TabletServer implements Runnable {
 
   private final RowLocks rowLocks = new RowLocks();
   
+  private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+
   private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface
{
 
     ThriftClientHandler() {
@@ -727,13 +732,17 @@ public class TabletServer implements Runnable {
         setUpdateTablet(us, keyExtent);
 
         if (us.currentTablet != null) {
+          long additionalMutationSize = 0;
           List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
           for (TMutation tmutation : tmutations) {
             Mutation mutation = new ServerMutation(tmutation);
             mutations.add(mutation);
-            us.queuedMutationSize += mutation.numBytes();
+            additionalMutationSize += mutation.numBytes();
           }
-          if (us.queuedMutationSize > TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX))
{
+          us.queuedMutationSize += additionalMutationSize;
+          long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize);
+          long total = TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
+          if (totalQueued > total) {
             flush(us);
           }
         }
@@ -777,7 +786,6 @@ public class TabletServer implements Runnable {
                 }
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
-                log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability
+ " table durability " + tabletDurability + " using " + DurabilityImpl.resolveDurabilty(us.durability,
tabletDurability));
                 sendables.put(commitSession, new Mutations(DurabilityImpl.resolveDurabilty(us.durability,
tabletDurability), mutations));
                 mutationCount += mutations.size();
               }
@@ -789,10 +797,8 @@ public class TabletServer implements Runnable {
 
               if (e.getNonViolators().size() > 0) {
                 // only log and commit mutations if there were some
-                // that did not
-                // violate constraints... this is what
-                // prepareMutationsForCommit()
-                // expects
+                // that did not violate constraints... this is what
+                // prepareMutationsForCommit() expects
                 sendables.put(e.getCommitSession(), new Mutations(DurabilityImpl.resolveDurabilty(us.durability,
tabletDurability), e.getNonViolators()));
               }
 
@@ -882,6 +888,7 @@ public class TabletServer implements Runnable {
         if (us.currentTablet != null) {
           us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
         }
+        updateTotalQueuedMutationSize(-us.queuedMutationSize);
         us.queuedMutationSize = 0;
       }
       us.totalUpdates += mutationCount;
@@ -1758,6 +1765,10 @@ public class TabletServer implements Runnable {
     return majorCompactorDisabled;
   }
 
+  public long updateTotalQueuedMutationSize(long additionalMutationSize) {
+    return totalQueuedMutationSize.addAndGet(additionalMutationSize);
+  }
+
   public Tablet getOnlineTablet(KeyExtent extent) {
     return onlineTablets.get(extent);
   }
@@ -2845,6 +2856,8 @@ public class TabletServer implements Runnable {
     result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
     result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
     result.logSorts = logSorter.getLogSorts();
+    result.flushs = flushCounter.get();
+    result.syncs = syncCounter.get();
     return result;
   }
 
@@ -2961,5 +2974,4 @@ public class TabletServer implements Runnable {
   public double getHoldTimeMillis() {
     return resourceManager.holdTime();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 3d42c7c..25c0ee8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -156,7 +156,7 @@ public class TabletServerResourceManager {
     long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
     long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
     long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
-    long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX);
+    long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
     _iCache = new LruBlockCache(iCacheSize, blockSize);
     _dCache = new LruBlockCache(dCacheSize, blockSize);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 50475c2..6747c14 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -169,6 +170,11 @@ public class DfsLogger {
         try {
           if (durabilityMethod != null) {
             durabilityMethod.invoke(logFile);
+            if (durabilityMethod == sync) {
+              syncCounter.incrementAndGet();
+            } else {
+              flushCounter.incrementAndGet();
+            }
           }
         } catch (Exception ex) {
           log.warn("Exception syncing " + ex);
@@ -248,9 +254,13 @@ public class DfsLogger {
 
   /* Track what's actually in +r/!0 for this logger ref */
   private String metaReference;
+  private AtomicLong syncCounter;
+  private AtomicLong flushCounter;
 
-  public DfsLogger(ServerResources conf) throws IOException {
+  public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter)
throws IOException {
     this.conf = conf;
+    this.syncCounter = syncCounter;
+    this.flushCounter = flushCounter;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 2a540e5..bdd3016 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -84,6 +84,9 @@ public class TabletServerLogger {
 
   private final AtomicInteger seqGen = new AtomicInteger();
 
+  private final AtomicLong syncCounter;
+  private final AtomicLong flushCounter;
+
   static private abstract class TestCallWithWriteLock {
     abstract boolean test();
 
@@ -128,9 +131,11 @@ public class TabletServerLogger {
     }
   }
 
-  public TabletServerLogger(TabletServer tserver, long maxSize) {
+  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong
flushCounter) {
     this.tserver = tserver;
     this.maxSize = maxSize;
+    this.syncCounter = syncCounter;
+    this.flushCounter = flushCounter;
   }
 
   private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
@@ -184,7 +189,7 @@ public class TabletServerLogger {
     }
 
     try {
-      DfsLogger alog = new DfsLogger(tserver.getServerConfig());
+      DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
       alog.open(tserver.getClientAddressString());
       loggers.add(alog);
       logSetId.incrementAndGet();
@@ -381,8 +386,7 @@ public class TabletServerLogger {
 
     final Map<CommitSession,Mutations> loggables = new HashMap<CommitSession,Mutations>(mutations);
     for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
-      Durability durability = entry.getValue().getDurability();
-      if (durability == Durability.NONE) {
+      if (entry.getValue().getDurability() == Durability.NONE) {
         loggables.remove(entry.getKey());
       }
     }
@@ -402,8 +406,9 @@ public class TabletServerLogger {
       }
     });
     for (Mutations entry : loggables.values()) {
-      if (entry.getMutations().size() < 1)
+      if (entry.getMutations().size() < 1) {
         throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
+      }
       for (Mutation m : entry.getMutations()) {
         logSizeEstimate.addAndGet(m.numBytes());
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
index 4af85a7..8d14574 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java
@@ -72,7 +72,7 @@ public class Config extends Test {
       s(Property.TSERV_MAXMEM, 1000000, 3 * 1024 * 1024 * 1024L),
       s(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 1, 25),
       s(Property.TSERV_MIGRATE_MAXCONCURRENT, 1, 10),
-      s(Property.TSERV_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
+      s(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, 10000, 1024 * 1024),
       s(Property.TSERV_RECOVERY_MAX_CONCURRENT, 1, 100),
       s(Property.TSERV_SCAN_MAX_OPENFILES, 10, 1000),
       s(Property.TSERV_THREADCHECK, 100, 10000),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
new file mode 100644
index 0000000..a794088
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+// see ACCUMULO-1950
+public class TotalQueuedIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
+    cfg.useMiniDFS();
+  }
+  
+  int SMALL_QUEUE_SIZE = 100000;
+  int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10;
+  static final long N = 1000000;
+
+  @Test(timeout = 4 * 60 * 1000)
+  public void test() throws Exception {
+    Random random = new Random();
+    Connector c = getConnector();
+    c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(),
"" + SMALL_QUEUE_SIZE);
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999");
+    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999");
+    UtilWaitThread.sleep(1000);
+    // get an idea of how fast the syncs occur
+    byte row[] = new byte[250];
+    BatchWriterConfig cfg = new BatchWriterConfig();
+    cfg.setMaxWriteThreads(10);
+    cfg.setMaxLatency(1, TimeUnit.SECONDS);
+    cfg.setMaxMemory(1024*1024);
+    long realSyncs = getSyncs();
+    BatchWriter bw = c.createBatchWriter(tableName, cfg);
+    long now = System.currentTimeMillis();
+    long bytesSent = 0;
+    for (int i = 0; i < N; i++) {
+      random.nextBytes(row);
+      Mutation m = new Mutation(row);
+      m.put("", "", "");
+      bw.addMutation(m);
+      bytesSent += m.estimatedMemoryUsed();
+    }
+    bw.close();
+    long diff = System.currentTimeMillis() - now;
+    double secs = diff / 1000.;
+    double syncs = bytesSent / SMALL_QUEUE_SIZE;
+    double syncsPerSec = syncs / secs;
+    System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f
syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec));
+    long update = getSyncs();
+    System.out.println("Syncs " + (update - realSyncs));
+    realSyncs = update;
+    
+    // Now with a much bigger total queue
+    c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(),
"" + LARGE_QUEUE_SIZE);
+    c.tableOperations().flush(tableName, null, null, true);
+    UtilWaitThread.sleep(1000);
+    bw = c.createBatchWriter(tableName, cfg);
+    now = System.currentTimeMillis();
+    bytesSent = 0;
+    for (int i = 0; i < N; i++) {
+      random.nextBytes(row);
+      Mutation m = new Mutation(row);
+      m.put("", "", "");
+      bw.addMutation(m);
+      bytesSent += m.estimatedMemoryUsed();
+    }
+    bw.close();
+    diff = System.currentTimeMillis() - now;
+    secs = diff / 1000.;
+    syncs = bytesSent / LARGE_QUEUE_SIZE;
+    syncsPerSec = syncs / secs;
+    System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f
syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec));
+    update = getSyncs();
+    System.out.println("Syncs " + (update - realSyncs));
+    assertTrue(update - realSyncs < realSyncs);
+  }
+
+  private long getSyncs() throws Exception {
+    Connector c = getConnector();
+    Credentials credentials = new Credentials("root", new PasswordToken(ROOT_PASSWORD.getBytes()));
+    for (String address : c.instanceOperations().getTabletServers()) {
+      TabletClientService.Client client = ThriftUtil.getTServerClient(address, DefaultConfiguration.getDefaultConfiguration());
+      TabletServerStatus status = client.getTabletServerStatus(null, credentials.toThrift(c.getInstance()));
+      return status.syncs;
+    }
+    return 0;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
index 8f6b830..6ee671e 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java
@@ -57,7 +57,7 @@ public class BloomFilterIT extends ConfigurableMacIT {
     siteConfig.put(Property.TABLE_BLOOM_SIZE.getKey(), "2000000");
     siteConfig.put(Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%");
     siteConfig.put(Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0");
-    siteConfig.put(Property.TSERV_MUTATION_QUEUE_MAX.getKey(), "10M");
+    siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M");
     siteConfig.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K");
     cfg.setSiteConfig(siteConfig );
   }


Mime
View raw message