Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9F0011DF6 for ; Mon, 8 Sep 2014 21:03:41 +0000 (UTC) Received: (qmail 17681 invoked by uid 500); 8 Sep 2014 21:03:41 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 17643 invoked by uid 500); 8 Sep 2014 21:03:41 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 17634 invoked by uid 99); 8 Sep 2014 21:03:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2014 21:03:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 681DEA0EFF0; Mon, 8 Sep 2014 21:03:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Message-Id: <1ff5087db1924fb69478a311df18c440@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: ACCUMULO-1950 use server-wide memory count Date: Mon, 8 Sep 2014 21:03:41 +0000 (UTC) Repository: accumulo Updated Branches: refs/heads/master 2533b7eff -> ef701cabb ACCUMULO-1950 use server-wide memory count Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef701cab Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef701cab Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef701cab Branch: refs/heads/master Commit: ef701cabbd5e1fed993bbd13e86c40b7d72040d0 Parents: 2533b7e Author: Eric C. Newton Authored: Mon Sep 8 16:38:49 2014 -0400 Committer: Eric C. Newton Committed: Mon Sep 8 16:38:49 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 6 +- .../core/master/thrift/TabletServerStatus.java | 198 ++++++++++++++++++- core/src/main/thrift/master.thrift | 2 + .../apache/accumulo/tserver/TabletServer.java | 30 ++- .../tserver/TabletServerResourceManager.java | 2 +- .../apache/accumulo/tserver/log/DfsLogger.java | 12 +- .../tserver/log/TabletServerLogger.java | 15 +- .../test/randomwalk/concurrent/Config.java | 2 +- .../org/apache/accumulo/test/TotalQueuedIT.java | 129 ++++++++++++ .../accumulo/test/functional/BloomFilterIT.java | 2 +- 10 files changed, 374 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 5401c7c..35cd0a6 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -197,11 +197,15 @@ public enum Property { TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "512M", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, "if the ports above are in use, search higher ports until one is available"), TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling client connections on the tablet servers"), + @Deprecated TSERV_MUTATION_QUEUE_MAX("tserver.mutation.queue.max", "1M", PropertyType.MEMORY, - "The amount of memory to use to store write-ahead-log mutations-per-session before flushing them. Since the buffer is per write session, consider the" + "This setting is deprecated. See tserver.total.mutation.queue.max. " + + "The amount of memory to use to store write-ahead-log mutations-per-session before flushing them. Since the buffer is per write session, consider the" + " max number of concurrent writer when configuring. When using Hadoop 2, Accumulo will call hsync() on the WAL . For a small number of " + "concurrent writers, increasing this buffer size decreases the frequncy of hsync calls. For a large number of concurrent writers a small buffers " + "size is ok because of group commit."), + TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "50M", PropertyType.MEMORY, + "The amount of memory used to store write-ahead-log mutations before flushing them."), TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN("tserver.tablet.split.midpoint.files.max", "30", PropertyType.COUNT, "To find a tablets split points, all index files are opened. This setting determines how many index " + "files can be opened at once. When there are more index files than this setting multiple passes " http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java index 6348537..e7e6e0e 100644 --- a/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java +++ b/core/src/main/java/org/apache/accumulo/core/master/thrift/TabletServerStatus.java @@ -62,6 +62,8 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField DATA_CACHE_HITS_FIELD_DESC = new org.apache.thrift.protocol.TField("dataCacheHits", org.apache.thrift.protocol.TType.I64, (short)12); private static final org.apache.thrift.protocol.TField DATA_CACHE_REQUEST_FIELD_DESC = new org.apache.thrift.protocol.TField("dataCacheRequest", org.apache.thrift.protocol.TType.I64, (short)13); private static final org.apache.thrift.protocol.TField LOG_SORTS_FIELD_DESC = new org.apache.thrift.protocol.TField("logSorts", org.apache.thrift.protocol.TType.LIST, (short)14); + private static final org.apache.thrift.protocol.TField FLUSHS_FIELD_DESC = new org.apache.thrift.protocol.TField("flushs", org.apache.thrift.protocol.TType.I64, (short)15); + private static final org.apache.thrift.protocol.TField SYNCS_FIELD_DESC = new org.apache.thrift.protocol.TField("syncs", org.apache.thrift.protocol.TType.I64, (short)16); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -80,6 +82,8 @@ import org.slf4j.LoggerFactory; public long dataCacheHits; // required public long dataCacheRequest; // required public List logSorts; // required + public long flushs; // required + public long syncs; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -93,7 +97,9 @@ import org.slf4j.LoggerFactory; INDEX_CACHE_REQUEST((short)11, "indexCacheRequest"), DATA_CACHE_HITS((short)12, "dataCacheHits"), DATA_CACHE_REQUEST((short)13, "dataCacheRequest"), - LOG_SORTS((short)14, "logSorts"); + LOG_SORTS((short)14, "logSorts"), + FLUSHS((short)15, "flushs"), + SYNCS((short)16, "syncs"); private static final Map byName = new HashMap(); @@ -130,6 +136,10 @@ import org.slf4j.LoggerFactory; return DATA_CACHE_REQUEST; case 14: // LOG_SORTS return LOG_SORTS; + case 15: // FLUSHS + return FLUSHS; + case 16: // SYNCS + return SYNCS; default: return null; } @@ -178,7 +188,9 @@ import org.slf4j.LoggerFactory; private static final int __INDEXCACHEREQUEST_ISSET_ID = 5; private static final int __DATACACHEHITS_ISSET_ID = 6; private static final int __DATACACHEREQUEST_ISSET_ID = 7; - private byte __isset_bitfield = 0; + private static final int __FLUSHS_ISSET_ID = 8; + private static final int __SYNCS_ISSET_ID = 9; + private short __isset_bitfield = 0; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -207,6 +219,10 @@ import org.slf4j.LoggerFactory; tmpMap.put(_Fields.LOG_SORTS, new org.apache.thrift.meta_data.FieldMetaData("logSorts", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, RecoveryStatus.class)))); + tmpMap.put(_Fields.FLUSHS, new org.apache.thrift.meta_data.FieldMetaData("flushs", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.SYNCS, new org.apache.thrift.meta_data.FieldMetaData("syncs", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TabletServerStatus.class, metaDataMap); } @@ -225,7 +241,9 @@ import org.slf4j.LoggerFactory; long indexCacheRequest, long dataCacheHits, long dataCacheRequest, - List logSorts) + List logSorts, + long flushs, + long syncs) { this(); this.tableMap = tableMap; @@ -247,6 +265,10 @@ import org.slf4j.LoggerFactory; this.dataCacheRequest = dataCacheRequest; setDataCacheRequestIsSet(true); this.logSorts = logSorts; + this.flushs = flushs; + setFlushsIsSet(true); + this.syncs = syncs; + setSyncsIsSet(true); } /** @@ -287,6 +309,8 @@ import org.slf4j.LoggerFactory; } this.logSorts = __this__logSorts; } + this.flushs = other.flushs; + this.syncs = other.syncs; } public TabletServerStatus deepCopy() { @@ -314,6 +338,10 @@ import org.slf4j.LoggerFactory; setDataCacheRequestIsSet(false); this.dataCacheRequest = 0; this.logSorts = null; + setFlushsIsSet(false); + this.flushs = 0; + setSyncsIsSet(false); + this.syncs = 0; } public int getTableMapSize() { @@ -598,6 +626,52 @@ import org.slf4j.LoggerFactory; } } + public long getFlushs() { + return this.flushs; + } + + public TabletServerStatus setFlushs(long flushs) { + this.flushs = flushs; + setFlushsIsSet(true); + return this; + } + + public void unsetFlushs() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __FLUSHS_ISSET_ID); + } + + /** Returns true if field flushs is set (has been assigned a value) and false otherwise */ + public boolean isSetFlushs() { + return EncodingUtils.testBit(__isset_bitfield, __FLUSHS_ISSET_ID); + } + + public void setFlushsIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FLUSHS_ISSET_ID, value); + } + + public long getSyncs() { + return this.syncs; + } + + public TabletServerStatus setSyncs(long syncs) { + this.syncs = syncs; + setSyncsIsSet(true); + return this; + } + + public void unsetSyncs() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SYNCS_ISSET_ID); + } + + /** Returns true if field syncs is set (has been assigned a value) and false otherwise */ + public boolean isSetSyncs() { + return EncodingUtils.testBit(__isset_bitfield, __SYNCS_ISSET_ID); + } + + public void setSyncsIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SYNCS_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TABLE_MAP: @@ -688,6 +762,22 @@ import org.slf4j.LoggerFactory; } break; + case FLUSHS: + if (value == null) { + unsetFlushs(); + } else { + setFlushs((Long)value); + } + break; + + case SYNCS: + if (value == null) { + unsetSyncs(); + } else { + setSyncs((Long)value); + } + break; + } } @@ -726,6 +816,12 @@ import org.slf4j.LoggerFactory; case LOG_SORTS: return getLogSorts(); + case FLUSHS: + return Long.valueOf(getFlushs()); + + case SYNCS: + return Long.valueOf(getSyncs()); + } throw new IllegalStateException(); } @@ -759,6 +855,10 @@ import org.slf4j.LoggerFactory; return isSetDataCacheRequest(); case LOG_SORTS: return isSetLogSorts(); + case FLUSHS: + return isSetFlushs(); + case SYNCS: + return isSetSyncs(); } throw new IllegalStateException(); } @@ -875,6 +975,24 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_flushs = true; + boolean that_present_flushs = true; + if (this_present_flushs || that_present_flushs) { + if (!(this_present_flushs && that_present_flushs)) + return false; + if (this.flushs != that.flushs) + return false; + } + + boolean this_present_syncs = true; + boolean that_present_syncs = true; + if (this_present_syncs || that_present_syncs) { + if (!(this_present_syncs && that_present_syncs)) + return false; + if (this.syncs != that.syncs) + return false; + } + return true; } @@ -1001,6 +1119,26 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetFlushs()).compareTo(other.isSetFlushs()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetFlushs()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.flushs, other.flushs); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetSyncs()).compareTo(other.isSetSyncs()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSyncs()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.syncs, other.syncs); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1076,6 +1214,14 @@ import org.slf4j.LoggerFactory; sb.append(this.logSorts); } first = false; + if (!first) sb.append(", "); + sb.append("flushs:"); + sb.append(this.flushs); + first = false; + if (!first) sb.append(", "); + sb.append("syncs:"); + sb.append(this.syncs); + first = false; sb.append(")"); return sb.toString(); } @@ -1233,6 +1379,22 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 15: // FLUSHS + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.flushs = iprot.readI64(); + struct.setFlushsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 16: // SYNCS + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.syncs = iprot.readI64(); + struct.setSyncsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1302,6 +1464,12 @@ import org.slf4j.LoggerFactory; } oprot.writeFieldEnd(); } + oprot.writeFieldBegin(FLUSHS_FIELD_DESC); + oprot.writeI64(struct.flushs); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(SYNCS_FIELD_DESC); + oprot.writeI64(struct.syncs); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1353,7 +1521,13 @@ import org.slf4j.LoggerFactory; if (struct.isSetLogSorts()) { optionals.set(10); } - oprot.writeBitSet(optionals, 11); + if (struct.isSetFlushs()) { + optionals.set(11); + } + if (struct.isSetSyncs()) { + optionals.set(12); + } + oprot.writeBitSet(optionals, 13); if (struct.isSetTableMap()) { { oprot.writeI32(struct.tableMap.size()); @@ -1400,12 +1574,18 @@ import org.slf4j.LoggerFactory; } } } + if (struct.isSetFlushs()) { + oprot.writeI64(struct.flushs); + } + if (struct.isSetSyncs()) { + oprot.writeI64(struct.syncs); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, TabletServerStatus struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(11); + BitSet incoming = iprot.readBitSet(13); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map11 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); @@ -1472,6 +1652,14 @@ import org.slf4j.LoggerFactory; } struct.setLogSortsIsSet(true); } + if (incoming.get(11)) { + struct.flushs = iprot.readI64(); + struct.setFlushsIsSet(true); + } + if (incoming.get(12)) { + struct.syncs = iprot.readI64(); + struct.setSyncsIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/core/src/main/thrift/master.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/master.thrift b/core/src/main/thrift/master.thrift index 49fa262..72ba3a5 100644 --- a/core/src/main/thrift/master.thrift +++ b/core/src/main/thrift/master.thrift @@ -60,6 +60,8 @@ struct TabletServerStatus { 12:i64 dataCacheHits 13:i64 dataCacheRequest 14:list logSorts + 15:i64 flushs + 16:i64 syncs } enum MasterState { http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index dc9f27f..b4fbfed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -263,6 +263,9 @@ public class TabletServer implements Runnable { private final TabletStatsKeeper statsKeeper; private final AtomicInteger logIdGenerator = new AtomicInteger(); + private final AtomicLong flushCounter = new AtomicLong(0); + private final AtomicLong syncCounter = new AtomicLong(0); + private final VolumeManager fs; public Instance getInstance() { return serverConfig.getInstance(); @@ -333,7 +336,7 @@ public class TabletServer implements Runnable { if (minBlockSize != 0 && minBlockSize > walogMaxSize) throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is " + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); - logger = new TabletServerLogger(this, walogMaxSize); + logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter); this.resourceManager = new TabletServerResourceManager(getInstance(), fs); } @@ -351,6 +354,8 @@ public class TabletServer implements Runnable { private final RowLocks rowLocks = new RowLocks(); + private final AtomicLong totalQueuedMutationSize = new AtomicLong(0); + private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { ThriftClientHandler() { @@ -727,13 +732,17 @@ public class TabletServer implements Runnable { setUpdateTablet(us, keyExtent); if (us.currentTablet != null) { + long additionalMutationSize = 0; List mutations = us.queuedMutations.get(us.currentTablet); for (TMutation tmutation : tmutations) { Mutation mutation = new ServerMutation(tmutation); mutations.add(mutation); - us.queuedMutationSize += mutation.numBytes(); + additionalMutationSize += mutation.numBytes(); } - if (us.queuedMutationSize > TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX)) { + us.queuedMutationSize += additionalMutationSize; + long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize); + long total = TabletServer.this.getConfiguration().getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); + if (totalQueued > total) { flush(us); } } @@ -777,7 +786,6 @@ public class TabletServer implements Runnable { } us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); } else { - log.debug("Durablity for " + tablet.getExtent() + " durability " + us.durability + " table durability " + tabletDurability + " using " + DurabilityImpl.resolveDurabilty(us.durability, tabletDurability)); sendables.put(commitSession, new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations)); mutationCount += mutations.size(); } @@ -789,10 +797,8 @@ public class TabletServer implements Runnable { if (e.getNonViolators().size() > 0) { // only log and commit mutations if there were some - // that did not - // violate constraints... this is what - // prepareMutationsForCommit() - // expects + // that did not violate constraints... this is what + // prepareMutationsForCommit() expects sendables.put(e.getCommitSession(), new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), e.getNonViolators())); } @@ -882,6 +888,7 @@ public class TabletServer implements Runnable { if (us.currentTablet != null) { us.queuedMutations.put(us.currentTablet, new ArrayList()); } + updateTotalQueuedMutationSize(-us.queuedMutationSize); us.queuedMutationSize = 0; } us.totalUpdates += mutationCount; @@ -1758,6 +1765,10 @@ public class TabletServer implements Runnable { return majorCompactorDisabled; } + public long updateTotalQueuedMutationSize(long additionalMutationSize) { + return totalQueuedMutationSize.addAndGet(additionalMutationSize); + } + public Tablet getOnlineTablet(KeyExtent extent) { return onlineTablets.get(extent); } @@ -2845,6 +2856,8 @@ public class TabletServer implements Runnable { result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount(); result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount(); result.logSorts = logSorter.getLogSorts(); + result.flushs = flushCounter.get(); + result.syncs = syncCounter.get(); return result; } @@ -2961,5 +2974,4 @@ public class TabletServer implements Runnable { public double getHoldTimeMillis() { return resourceManager.holdTime(); } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 3d42c7c..25c0ee8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -156,7 +156,7 @@ public class TabletServerResourceManager { long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE); long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE); long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE); - long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_MUTATION_QUEUE_MAX); + long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); _iCache = new LruBlockCache(iCacheSize, blockSize); _dCache = new LruBlockCache(dCacheSize, blockSize); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 50475c2..6747c14 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -169,6 +170,11 @@ public class DfsLogger { try { if (durabilityMethod != null) { durabilityMethod.invoke(logFile); + if (durabilityMethod == sync) { + syncCounter.incrementAndGet(); + } else { + flushCounter.incrementAndGet(); + } } } catch (Exception ex) { log.warn("Exception syncing " + ex); @@ -248,9 +254,13 @@ public class DfsLogger { /* Track what's actually in +r/!0 for this logger ref */ private String metaReference; + private AtomicLong syncCounter; + private AtomicLong flushCounter; - public DfsLogger(ServerResources conf) throws IOException { + public DfsLogger(ServerResources conf, AtomicLong syncCounter, AtomicLong flushCounter) throws IOException { this.conf = conf; + this.syncCounter = syncCounter; + this.flushCounter = flushCounter; } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 2a540e5..bdd3016 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -84,6 +84,9 @@ public class TabletServerLogger { private final AtomicInteger seqGen = new AtomicInteger(); + private final AtomicLong syncCounter; + private final AtomicLong flushCounter; + static private abstract class TestCallWithWriteLock { abstract boolean test(); @@ -128,9 +131,11 @@ public class TabletServerLogger { } } - public TabletServerLogger(TabletServer tserver, long maxSize) { + public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter) { this.tserver = tserver; this.maxSize = maxSize; + this.syncCounter = syncCounter; + this.flushCounter = flushCounter; } private int initializeLoggers(final List copy) throws IOException { @@ -184,7 +189,7 @@ public class TabletServerLogger { } try { - DfsLogger alog = new DfsLogger(tserver.getServerConfig()); + DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); alog.open(tserver.getClientAddressString()); loggers.add(alog); logSetId.incrementAndGet(); @@ -381,8 +386,7 @@ public class TabletServerLogger { final Map loggables = new HashMap(mutations); for (Entry entry : mutations.entrySet()) { - Durability durability = entry.getValue().getDurability(); - if (durability == Durability.NONE) { + if (entry.getValue().getDurability() == Durability.NONE) { loggables.remove(entry.getKey()); } } @@ -402,8 +406,9 @@ public class TabletServerLogger { } }); for (Mutations entry : loggables.values()) { - if (entry.getMutations().size() < 1) + if (entry.getMutations().size() < 1) { throw new IllegalArgumentException("logManyTablets: logging empty mutation list"); + } for (Mutation m : entry.getMutations()) { logSizeEstimate.addAndGet(m.numBytes()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java index 4af85a7..8d14574 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/Config.java @@ -72,7 +72,7 @@ public class Config extends Test { s(Property.TSERV_MAXMEM, 1000000, 3 * 1024 * 1024 * 1024L), s(Property.TSERV_READ_AHEAD_MAXCONCURRENT, 1, 25), s(Property.TSERV_MIGRATE_MAXCONCURRENT, 1, 10), - s(Property.TSERV_MUTATION_QUEUE_MAX, 10000, 1024 * 1024), + s(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, 10000, 1024 * 1024), s(Property.TSERV_RECOVERY_MAX_CONCURRENT, 1, 100), s(Property.TSERV_SCAN_MAX_OPENFILES, 10, 1000), s(Property.TSERV_THREADCHECK, 100, 10000), http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java new file mode 100644 index 0000000..a794088 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/TotalQueuedIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +// see ACCUMULO-1950 +public class TotalQueuedIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE); + cfg.useMiniDFS(); + } + + int SMALL_QUEUE_SIZE = 100000; + int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10; + static final long N = 1000000; + + @Test(timeout = 4 * 60 * 1000) + public void test() throws Exception { + Random random = new Random(); + Connector c = getConnector(); + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999"); + UtilWaitThread.sleep(1000); + // get an idea of how fast the syncs occur + byte row[] = new byte[250]; + BatchWriterConfig cfg = new BatchWriterConfig(); + cfg.setMaxWriteThreads(10); + cfg.setMaxLatency(1, TimeUnit.SECONDS); + cfg.setMaxMemory(1024*1024); + long realSyncs = getSyncs(); + BatchWriter bw = c.createBatchWriter(tableName, cfg); + long now = System.currentTimeMillis(); + long bytesSent = 0; + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + bw.close(); + long diff = System.currentTimeMillis() - now; + double secs = diff / 1000.; + double syncs = bytesSent / SMALL_QUEUE_SIZE; + double syncsPerSec = syncs / secs; + System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec)); + long update = getSyncs(); + System.out.println("Syncs " + (update - realSyncs)); + realSyncs = update; + + // Now with a much bigger total queue + c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE); + c.tableOperations().flush(tableName, null, null, true); + UtilWaitThread.sleep(1000); + bw = c.createBatchWriter(tableName, cfg); + now = System.currentTimeMillis(); + bytesSent = 0; + for (int i = 0; i < N; i++) { + random.nextBytes(row); + Mutation m = new Mutation(row); + m.put("", "", ""); + bw.addMutation(m); + bytesSent += m.estimatedMemoryUsed(); + } + bw.close(); + diff = System.currentTimeMillis() - now; + secs = diff / 1000.; + syncs = bytesSent / LARGE_QUEUE_SIZE; + syncsPerSec = syncs / secs; + System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long)syncs), syncsPerSec)); + update = getSyncs(); + System.out.println("Syncs " + (update - realSyncs)); + assertTrue(update - realSyncs < realSyncs); + } + + private long getSyncs() throws Exception { + Connector c = getConnector(); + Credentials credentials = new Credentials("root", new PasswordToken(ROOT_PASSWORD.getBytes())); + for (String address : c.instanceOperations().getTabletServers()) { + TabletClientService.Client client = ThriftUtil.getTServerClient(address, DefaultConfiguration.getDefaultConfiguration()); + TabletServerStatus status = client.getTabletServerStatus(null, credentials.toThrift(c.getInstance())); + return status.syncs; + } + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef701cab/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java index 8f6b830..6ee671e 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BloomFilterIT.java @@ -57,7 +57,7 @@ public class BloomFilterIT extends ConfigurableMacIT { siteConfig.put(Property.TABLE_BLOOM_SIZE.getKey(), "2000000"); siteConfig.put(Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%"); siteConfig.put(Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0"); - siteConfig.put(Property.TSERV_MUTATION_QUEUE_MAX.getKey(), "10M"); + siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M"); siteConfig.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K"); cfg.setSiteConfig(siteConfig ); }