Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1CAFB109B5 for ; Mon, 27 Jan 2014 22:40:06 +0000 (UTC) Received: (qmail 26828 invoked by uid 500); 27 Jan 2014 22:40:03 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 26771 invoked by uid 500); 27 Jan 2014 22:40:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 26735 invoked by uid 99); 27 Jan 2014 22:40:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Jan 2014 22:40:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F41D39077E0; Mon, 27 Jan 2014 22:40:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Date: Mon, 27 Jan 2014 22:40:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: New counters implementation New counters implementation patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for CASSANDRA-6504 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/714c4233 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/714c4233 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/714c4233 Branch: refs/heads/trunk Commit: 714c423360c36da2a2b365efaf9c5c4f623ed133 Parents: 1218bca Author: Aleksey Yeschenko Authored: Fri Jan 24 02:52:43 2014 -0800 Committer: Aleksey Yeschenko Committed: Mon Jan 27 16:37:41 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 5 + conf/cassandra.yaml | 33 ++- doc/cql3/CQL.textile | 1 - interface/cassandra.thrift | 3 +- pylib/cqlshlib/cql3handling.py | 1 - pylib/cqlshlib/test/test_cqlsh_output.py | 1 - .../apache/cassandra/cache/AutoSavingCache.java | 3 +- .../apache/cassandra/cache/CounterCacheKey.java | 89 ++++++++ .../org/apache/cassandra/concurrent/Stage.java | 6 +- .../cassandra/concurrent/StageManager.java | 14 +- .../org/apache/cassandra/config/CFMetaData.java | 20 -- .../org/apache/cassandra/config/Config.java | 14 +- .../cassandra/config/DatabaseDescriptor.java | 75 ++++++- .../cassandra/cql/AlterTableStatement.java | 1 - .../org/apache/cassandra/cql/CFPropDefs.java | 3 +- .../cql/CreateColumnFamilyStatement.java | 1 - .../cql3/statements/BatchStatement.java | 7 +- .../cassandra/cql3/statements/CFPropDefs.java | 5 +- .../cql3/statements/ModificationStatement.java | 6 +- src/java/org/apache/cassandra/db/Cell.java | 13 +- .../org/apache/cassandra/db/ClockAndCount.java | 73 +++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 89 ++++++-- .../apache/cassandra/db/ConsistencyLevel.java | 11 +- .../org/apache/cassandra/db/CounterCell.java | 37 +--- .../apache/cassandra/db/CounterMutation.java | 210 +++++++++++++----- .../apache/cassandra/db/CounterUpdateCell.java | 19 +- .../org/apache/cassandra/db/DeletedCell.java | 7 - .../org/apache/cassandra/db/ExpiringCell.java | 7 - src/java/org/apache/cassandra/db/IMutation.java | 2 +- src/java/org/apache/cassandra/db/Mutation.java | 6 + .../org/apache/cassandra/db/SystemKeyspace.java | 21 -- .../db/compaction/CompactionManager.java | 23 +- .../cassandra/db/context/CounterContext.java | 214 +++++++++---------- .../apache/cassandra/db/context/IContext.java | 75 ------- .../io/sstable/AbstractSSTableSimpleWriter.java | 2 +- .../apache/cassandra/net/MessagingService.java | 2 +- .../service/AbstractWriteResponseHandler.java | 7 +- .../apache/cassandra/service/CacheService.java | 127 ++++++++++- .../cassandra/service/CacheServiceMBean.java | 10 + .../apache/cassandra/service/StorageProxy.java | 38 ++-- .../cassandra/service/StorageProxyMBean.java | 2 + .../cassandra/service/StorageService.java | 18 +- .../cassandra/thrift/CassandraServer.java | 7 +- .../org/apache/cassandra/tools/NodeProbe.java | 15 +- .../org/apache/cassandra/tools/NodeTool.java | 42 +++- .../org/apache/cassandra/utils/CounterId.java | 106 ++------- .../unit/org/apache/cassandra/SchemaLoader.java | 8 + test/unit/org/apache/cassandra/Util.java | 4 +- .../org/apache/cassandra/config/DefsTest.java | 1 - .../org/apache/cassandra/db/CleanupTest.java | 5 +- .../cassandra/db/ColumnFamilyStoreTest.java | 7 +- .../apache/cassandra/db/CounterCacheTest.java | 96 +++++++++ .../apache/cassandra/db/CounterCellTest.java | 44 ++-- .../cassandra/db/CounterMutationTest.java | 180 ++++++++++++++-- .../cassandra/db/RecoveryManagerTest.java | 2 +- .../db/context/CounterContextTest.java | 92 ++++++-- .../cassandra/tools/SSTableExportTest.java | 3 +- .../apache/cassandra/utils/CounterIdTest.java | 51 +++++ 59 files changed, 1333 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 10548fa..cc406c8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -25,6 +25,7 @@ * CF id is changed to be non-deterministic. Data dir/key cache are created uniquely for CF id (CASSANDRA-5202) * Cassandra won't start by default without jna (CASSANDRA-6575) + * New counters implementation (CASSANDRA-6504) 2.0.5 http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 3070500..4e00faf 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,11 @@ Upgrading cold_reads_to_omit compaction option; 0.0 omits nothing (the old behavior) and 1.0 omits everything. - Multithreaded compaction has been removed. + - Counters implementation has been changed, replaced by a safer one with + less caveats, but different performance characteristics. You might have + to change your data model to accomodate the new implementation. + (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev + blog post at http://www.datastax.com/dev/blog/ for details). 2.0.5 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index bdbb9ff..06cb33f 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -171,6 +171,32 @@ row_cache_save_period: 0 # Disabled by default, meaning all keys are going to be saved # row_cache_keys_to_save: 100 +# Maximum size of the counter cache in memory. +# +# Counter cache helps to reduce counter locks' contention for hot counter cells. +# In case of RF = 1 a counter cache hit will cause Cassandra to skip the read before +# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration +# of the lock hold, helping with hot counter cell updates, but will not allow skipping +# the read entirely. Only the local (clock, count) tuple of a counter cell is kept +# in memory, not the whole counter, so it's relatively cheap. +# +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache. +# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache. +counter_cache_size_in_mb: + +# Duration in seconds after which Cassandra should +# save the counter cache (keys only). Caches are saved to saved_caches_directory as +# specified in this configuration file. +# +# Default is 7200 or 2 hours. +counter_cache_save_period: 7200 + +# Number of keys from the counter cache to save +# Disabled by default, meaning all keys are going to be saved +# counter_cache_keys_to_save: 100 + # The off-heap memory allocator. Affects storage engine metadata as # well as caches. Experiments show that JEMAlloc saves some memory # than the native GCC allocator (i.e., JEMalloc is more @@ -234,13 +260,16 @@ seed_provider: # bottleneck will be reads that need to fetch data from # disk. "concurrent_reads" should be set to (16 * number_of_drives) in # order to allow the operations to enqueue low enough in the stack -# that the OS and drives can reorder them. +# that the OS and drives can reorder them. Same applies to +# "concurrent_counter_writes", since counter writes read the current +# values before incrementing and writing them back. # # On the other hand, since writes are almost never IO bound, the ideal # number of "concurrent_writes" is dependent on the number of cores in # your system; (8 * number_of_cores) is a good rule of thumb. concurrent_reads: 32 concurrent_writes: 32 +concurrent_counter_writes: 32 # Total memory to use for sstable-reading buffers. Defaults to # the smaller of 1/4 of heap or 512MB. @@ -491,6 +520,8 @@ read_request_timeout_in_ms: 5000 range_request_timeout_in_ms: 10000 # How long the coordinator should wait for writes to complete write_request_timeout_in_ms: 2000 +# How long the coordinator should wait for counter writes to complete +counter_write_request_timeout_in_ms: 5000 # How long a coordinator should continue to retry a CAS operation # that contends with other proposals for the same row cas_contention_timeout_in_ms: 1000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/doc/cql3/CQL.textile ---------------------------------------------------------------------- diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile index 5fff402..6d68584 100644 --- a/doc/cql3/CQL.textile +++ b/doc/cql3/CQL.textile @@ -308,7 +308,6 @@ Table creation supports the following other @@: |@bloom_filter_fp_chance@ | _simple_ | 0.00075 | The target probability of false positive of the sstable bloom filters. Said bloom filters will be sized to provide the provided probability (thus lowering this value impact the size of bloom filters in-memory and on-disk)| |@compaction@ | _map_ | _see below_ | The compaction options to use, see below.| |@compression@ | _map_ | _see below_ | Compression options, see below. | -|@replicate_on_write@ | _simple_ | true | Whether to replicate data on write. This can only be set to false for tables with counters values. Disabling this is dangerous and can result in random lose of counters, don't disable unless you are sure to know what you are doing| |@caching@ | _simple_ | keys_only | Whether to cache keys ("key cache") and/or rows ("row cache") for this table. Valid values are: @all@, @keys_only@, @rows_only@ and @none@. | http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 780ffc7..289be1f 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -459,7 +459,6 @@ struct CfDef { 16: optional i32 id, 17: optional i32 min_compaction_threshold, 18: optional i32 max_compaction_threshold, - 24: optional bool replicate_on_write, 26: optional string key_validation_class, 28: optional binary key_alias, 29: optional string compaction_strategy, @@ -492,6 +491,8 @@ struct CfDef { /** @deprecated */ 23: optional double memtable_operations_in_millions, /** @deprecated */ + 24: optional bool replicate_on_write, + /** @deprecated */ 25: optional double merge_shards_chance, /** @deprecated */ 27: optional string row_cache_provider, http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 421ab27..c4fa97d 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -68,7 +68,6 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): ('gc_grace_seconds', None), ('index_interval', None), ('read_repair_chance', None), - ('replicate_on_write', None), ('populate_io_cache_on_flush', None), ('default_time_to_live', None), ('speculative_retry', None), http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/pylib/cqlshlib/test/test_cqlsh_output.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/test/test_cqlsh_output.py b/pylib/cqlshlib/test/test_cqlsh_output.py index f89127d..102a040 100644 --- a/pylib/cqlshlib/test/test_cqlsh_output.py +++ b/pylib/cqlshlib/test/test_cqlsh_output.py @@ -661,7 +661,6 @@ class TestCqlshOutput(BaseTestCase): gc_grace_seconds=864000 AND index_interval=128 AND read_repair_chance=0.100000 AND - replicate_on_write='true' AND populate_io_cache_on_flush='false' AND default_time_to_live=0 AND speculative_retry='NONE' AND http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 3ed2c2c..f94999e 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -127,7 +127,8 @@ public class AutoSavingCache extends InstrumentingCache> future : futures) { Pair entry = future.get(); - put(entry.left, entry.right); + if (entry != null) + put(entry.left, entry.right); } } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cache/CounterCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CounterCacheKey.java b/src/java/org/apache/cassandra/cache/CounterCacheKey.java new file mode 100644 index 0000000..acbe323 --- /dev/null +++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.UUID; + +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.utils.*; + +public class CounterCacheKey implements CacheKey +{ + public final UUID cfId; + public final byte[] partitionKey; + public final byte[] cellName; + + private CounterCacheKey(UUID cfId, ByteBuffer partitionKey, CellName cellName) + { + this.cfId = cfId; + this.partitionKey = ByteBufferUtil.getArray(partitionKey); + this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer()); + } + + public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, CellName cellName) + { + return new CounterCacheKey(cfId, partitionKey, cellName); + } + + public PathInfo getPathInfo() + { + Pair cf = Schema.instance.getCF(cfId); + return new PathInfo(cf.left, cf.right, cfId); + } + + public long memorySize() + { + return ObjectSizes.getFieldSize(3 * ObjectSizes.getReferenceSize()) + + ObjectSizes.getArraySize(partitionKey) + + ObjectSizes.getArraySize(cellName); + } + + @Override + public String toString() + { + return String.format("CounterCacheKey(%s, %s, %s)", + cfId, + ByteBufferUtil.bytesToHex(ByteBuffer.wrap(partitionKey)), + ByteBufferUtil.bytesToHex(ByteBuffer.wrap(cellName))); + } + + @Override + public int hashCode() + { + return Arrays.deepHashCode(new Object[]{cfId, partitionKey, cellName}); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof CounterCacheKey)) + return false; + + CounterCacheKey cck = (CounterCacheKey) o; + + return cfId.equals(cck.cfId) + && Arrays.equals(partitionKey, cck.partitionKey) + && Arrays.equals(cellName, cck.cellName); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/Stage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java index f2907e2..6192cab 100644 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@ -21,6 +21,7 @@ public enum Stage { READ, MUTATION, + COUNTER_MUTATION, GOSSIP, REQUEST_RESPONSE, ANTI_ENTROPY, @@ -28,8 +29,7 @@ public enum Stage MISC, TRACING, INTERNAL_RESPONSE, - READ_REPAIR, - REPLICATE_ON_WRITE; + READ_REPAIR; public String getJmxType() { @@ -43,9 +43,9 @@ public enum Stage case INTERNAL_RESPONSE: return "internal"; case MUTATION: + case COUNTER_MUTATION: case READ: case REQUEST_RESPONSE: - case REPLICATE_ON_WRITE: case READ_REPAIR: return "request"; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java index 2960f22..512d64a 100644 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@ -43,15 +43,13 @@ public class StageManager public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle - public static final int MAX_REPLICATE_ON_WRITE_TASKS = 1024 * FBUtilities.getAvailableProcessors(); - static { stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters())); + stages.put(Stage.COUNTER_MUTATION, multiThreadedConfigurableStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters())); stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders())); stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors())); stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors())); - stages.put(Stage.REPLICATE_ON_WRITE, multiThreadedConfigurableStage(Stage.REPLICATE_ON_WRITE, getConcurrentReplicators(), MAX_REPLICATE_ON_WRITE_TASKS)); // the rest are all single-threaded stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP)); stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY)); @@ -99,16 +97,6 @@ public class StageManager stage.getJmxType()); } - private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock) - { - return new JMXConfigurableThreadPoolExecutor(numThreads, - KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue(maxTasksBeforeBlock), - new NamedThreadFactory(stage.getJmxName()), - stage.getJmxType()); - } - /** * Retrieve a stage from the StageManager * @param stage name of the stage to be retrieved. http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 817d4a3..f377734 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -76,7 +76,6 @@ public final class CFMetaData public final static double DEFAULT_READ_REPAIR_CHANCE = 0.1; public final static double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.0; - public final static boolean DEFAULT_REPLICATE_ON_WRITE = true; public final static int DEFAULT_GC_GRACE_SECONDS = 864000; public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4; public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32; @@ -128,7 +127,6 @@ public final class CFMetaData + "comment text," + "read_repair_chance double," + "local_read_repair_chance double," - + "replicate_on_write boolean," + "gc_grace_seconds int," + "default_validator text," + "key_validator text," @@ -395,7 +393,6 @@ public final class CFMetaData private volatile String comment = ""; private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE; private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; - private volatile boolean replicateOnWrite = DEFAULT_REPLICATE_ON_WRITE; private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; private volatile AbstractType defaultValidator = BytesType.instance; private volatile AbstractType keyValidator = BytesType.instance; @@ -437,7 +434,6 @@ public final class CFMetaData public CFMetaData comment(String prop) { comment = enforceCommentNotNull(prop); return this;} public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;} public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;} - public CFMetaData replicateOnWrite(boolean prop) {replicateOnWrite = prop; return this;} public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;} public CFMetaData defaultValidator(AbstractType prop) {defaultValidator = prop; return this;} public CFMetaData keyValidator(AbstractType prop) {keyValidator = prop; return this;} @@ -624,7 +620,6 @@ public final class CFMetaData .comment(oldCFMD.comment) .readRepairChance(oldCFMD.readRepairChance) .dcLocalReadRepairChance(oldCFMD.dcLocalReadRepairChance) - .replicateOnWrite(oldCFMD.replicateOnWrite) .gcGraceSeconds(oldCFMD.gcGraceSeconds) .defaultValidator(oldCFMD.defaultValidator) .keyValidator(oldCFMD.keyValidator) @@ -691,11 +686,6 @@ public final class CFMetaData return ReadRepairDecision.NONE; } - public boolean getReplicateOnWrite() - { - return replicateOnWrite; - } - public boolean populateIoCacheOnFlush() { return populateIoCacheOnFlush; @@ -869,7 +859,6 @@ public final class CFMetaData .append(comment, rhs.comment) .append(readRepairChance, rhs.readRepairChance) .append(dcLocalReadRepairChance, rhs.dcLocalReadRepairChance) - .append(replicateOnWrite, rhs.replicateOnWrite) .append(gcGraceSeconds, rhs.gcGraceSeconds) .append(defaultValidator, rhs.defaultValidator) .append(keyValidator, rhs.keyValidator) @@ -902,7 +891,6 @@ public final class CFMetaData .append(comment) .append(readRepairChance) .append(dcLocalReadRepairChance) - .append(replicateOnWrite) .append(gcGraceSeconds) .append(defaultValidator) .append(keyValidator) @@ -956,8 +944,6 @@ public final class CFMetaData { if (!cf_def.isSetComment()) cf_def.setComment(""); - if (!cf_def.isSetReplicate_on_write()) - cf_def.setReplicate_on_write(CFMetaData.DEFAULT_REPLICATE_ON_WRITE); if (!cf_def.isSetPopulate_io_cache_on_flush()) cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH); if (!cf_def.isSetMin_compaction_threshold()) @@ -1047,7 +1033,6 @@ public final class CFMetaData return newCFMD.addAllColumnDefinitions(ColumnDefinition.fromThrift(newCFMD, cf_def.column_metadata)) .comment(cf_def.comment) - .replicateOnWrite(cf_def.replicate_on_write) .defaultValidator(TypeParser.parse(cf_def.default_validation_class)) .compressionParameters(cp) .rebuild(); @@ -1125,7 +1110,6 @@ public final class CFMetaData comment = enforceCommentNotNull(cfm.comment); readRepairChance = cfm.readRepairChance; dcLocalReadRepairChance = cfm.dcLocalReadRepairChance; - replicateOnWrite = cfm.replicateOnWrite; gcGraceSeconds = cfm.gcGraceSeconds; defaultValidator = cfm.defaultValidator; keyValidator = cfm.keyValidator; @@ -1265,7 +1249,6 @@ public final class CFMetaData def.setComment(enforceCommentNotNull(comment)); def.setRead_repair_chance(readRepairChance); def.setDclocal_read_repair_chance(dcLocalReadRepairChance); - def.setReplicate_on_write(replicateOnWrite); def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush); def.setGc_grace_seconds(gcGraceSeconds); def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString()); @@ -1628,7 +1611,6 @@ public final class CFMetaData adder.add("comment", comment); adder.add("read_repair_chance", readRepairChance); adder.add("local_read_repair_chance", dcLocalReadRepairChance); - adder.add("replicate_on_write", replicateOnWrite); adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush); adder.add("gc_grace_seconds", gcGraceSeconds); adder.add("default_validator", defaultValidator.toString()); @@ -1692,7 +1674,6 @@ public final class CFMetaData cfm.readRepairChance(result.getDouble("read_repair_chance")); cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); - cfm.replicateOnWrite(result.getBoolean("replicate_on_write")); cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); @@ -2172,7 +2153,6 @@ public final class CFMetaData .append("comment", comment) .append("readRepairChance", readRepairChance) .append("dclocalReadRepairChance", dcLocalReadRepairChance) - .append("replicateOnWrite", replicateOnWrite) .append("gcGraceSeconds", gcGraceSeconds) .append("defaultValidator", defaultValidator) .append("keyValidator", keyValidator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 2ea8e38..5a944a2 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -58,6 +58,8 @@ public class Config public volatile Long write_request_timeout_in_ms = 2000L; + public volatile Long counter_write_request_timeout_in_ms = 5000L; + public volatile Long cas_contention_timeout_in_ms = 1000L; public volatile Long truncate_request_timeout_in_ms = 60000L; @@ -70,7 +72,10 @@ public class Config public Integer concurrent_reads = 32; public Integer concurrent_writes = 32; - public Integer concurrent_replicates = 32; + public Integer concurrent_counter_writes = 32; + + @Deprecated + public Integer concurrent_replicates = null; public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor public Integer memtable_total_space_in_mb; @@ -165,7 +170,12 @@ public class Config public long row_cache_size_in_mb = 0; public volatile int row_cache_save_period = 0; - public int row_cache_keys_to_save = Integer.MAX_VALUE; + public volatile int row_cache_keys_to_save = Integer.MAX_VALUE; + + public Long counter_cache_size_in_mb = null; + public volatile int counter_cache_save_period = 7200; + public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE; + public String memory_allocator = NativeAllocator.class.getSimpleName(); public boolean populate_io_cache_on_flush = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4b627c8..eca8881 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -86,6 +86,7 @@ public class DatabaseDescriptor private static RequestSchedulerOptions requestSchedulerOptions; private static long keyCacheSizeInMB; + private static long counterCacheSizeInMB; private static IAllocator memoryAllocator; private static long indexSummaryCapacityInMB; @@ -248,10 +249,11 @@ public class DatabaseDescriptor throw new ConfigurationException("concurrent_writes must be at least 2"); } - if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2) - { - throw new ConfigurationException("concurrent_replicates must be at least 2"); - } + if (conf.concurrent_counter_writes != null && conf.concurrent_counter_writes < 2) + throw new ConfigurationException("concurrent_counter_writes must be at least 2"); + + if (conf.concurrent_replicates != null) + logger.warn("concurrent_replicates has been deprecated and should be removed from cassandra.yaml"); if (conf.file_cache_size_in_mb == null) conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))); @@ -446,6 +448,22 @@ public class DatabaseDescriptor + conf.key_cache_size_in_mb + "', supported values are >= 0."); } + try + { + // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB) + counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null) + ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50) + : conf.counter_cache_size_in_mb; + + if (counterCacheSizeInMB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '" + + conf.counter_cache_size_in_mb + "', supported values are >= 0."); + } + // if set to empty/"auto" then use 5% of Heap size indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null) ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)) @@ -780,6 +798,16 @@ public class DatabaseDescriptor conf.write_request_timeout_in_ms = timeOutInMillis; } + public static long getCounterWriteRpcTimeout() + { + return conf.counter_write_request_timeout_in_ms; + } + + public static void setCounterWriteRpcTimeout(Long timeOutInMillis) + { + conf.counter_write_request_timeout_in_ms = timeOutInMillis; + } + public static long getCasContentionTimeout() { return conf.cas_contention_timeout_in_ms; @@ -818,8 +846,9 @@ public class DatabaseDescriptor return getTruncateRpcTimeout(); case READ_REPAIR: case MUTATION: - case COUNTER_MUTATION: return getWriteRpcTimeout(); + case COUNTER_MUTATION: + return getCounterWriteRpcTimeout(); default: return getRpcTimeout(); } @@ -830,7 +859,12 @@ public class DatabaseDescriptor */ public static long getMinRpcTimeout() { - return Longs.min(getRpcTimeout(), getReadRpcTimeout(), getRangeRpcTimeout(), getWriteRpcTimeout(), getTruncateRpcTimeout()); + return Longs.min(getRpcTimeout(), + getReadRpcTimeout(), + getRangeRpcTimeout(), + getWriteRpcTimeout(), + getCounterWriteRpcTimeout(), + getTruncateRpcTimeout()); } public static double getPhiConvictThreshold() @@ -853,9 +887,9 @@ public class DatabaseDescriptor return conf.concurrent_writes; } - public static int getConcurrentReplicators() + public static int getConcurrentCounterWriters() { - return conf.concurrent_replicates; + return conf.concurrent_counter_writes; } public static int getFlushWriters() @@ -1283,6 +1317,31 @@ public class DatabaseDescriptor return conf.row_cache_keys_to_save; } + public static long getCounterCacheSizeInMB() + { + return counterCacheSizeInMB; + } + + public static int getCounterCacheSavePeriod() + { + return conf.counter_cache_save_period; + } + + public static void setCounterCacheSavePeriod(int counterCacheSavePeriod) + { + conf.counter_cache_save_period = counterCacheSavePeriod; + } + + public static int getCounterCacheKeysToSave() + { + return conf.counter_cache_keys_to_save; + } + + public static void setCounterCacheKeysToSave(int counterCacheKeysToSave) + { + conf.counter_cache_keys_to_save = counterCacheKeysToSave; + } + public static IAllocator getoffHeapMemoryAllocator() { return memoryAllocator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java index 0d767b2..b5ac464 100644 --- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java @@ -173,7 +173,6 @@ public class AlterTableStatement cfm.readRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, cfm.getReadRepairChance())); cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair())); cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds())); - cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite())); int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold()); int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()); if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java index a7d3147..2131d06 100644 --- a/src/java/org/apache/cassandra/cql/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java @@ -47,7 +47,6 @@ public class CFPropDefs { public static final String KW_DEFAULTVALIDATION = "default_validation"; public static final String KW_MINCOMPACTIONTHRESHOLD = "min_compaction_threshold"; public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold"; - public static final String KW_REPLICATEONWRITE = "replicate_on_write"; public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class"; public static final String KW_CACHING = "caching"; public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live"; @@ -90,7 +89,6 @@ public class CFPropDefs { keywords.add(KW_DEFAULTVALIDATION); keywords.add(KW_MINCOMPACTIONTHRESHOLD); keywords.add(KW_MAXCOMPACTIONTHRESHOLD); - keywords.add(KW_REPLICATEONWRITE); keywords.add(KW_COMPACTION_STRATEGY_CLASS); keywords.add(KW_CACHING); keywords.add(KW_DEFAULT_TIME_TO_LIVE); @@ -107,6 +105,7 @@ public class CFPropDefs { obsoleteKeywords.add("memtable_operations_in_millions"); obsoleteKeywords.add("memtable_flush_after_mins"); obsoleteKeywords.add("row_cache_provider"); + obsoleteKeywords.add("replicate_on_write"); allowedKeywords.addAll(keywords); allowedKeywords.addAll(obsoleteKeywords); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java index a71707c..7b5cbaf 100644 --- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java @@ -188,7 +188,6 @@ public class CreateColumnFamilyStatement .comment(cfProps.getProperty(CFPropDefs.KW_COMMENT)) .readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE)) .dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE)) - .replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE)) .gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS)) .defaultValidator(cfProps.getValidator()) .minCompactionThreshold(minCompactionThreshold) http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 25f61fb..e500359 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -25,6 +25,7 @@ import org.github.jamm.MemoryMeter; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; @@ -174,7 +175,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException { for (IMutation mutation : getMutations(Collections.emptyList(), true, null, queryState.getTimestamp())) - mutation.apply(); + { + // We don't use counters internally. + assert mutation instanceof Mutation; + ((Mutation) mutation).apply(); + } return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java index 6ce6406..3929f3c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java @@ -34,7 +34,6 @@ public class CFPropDefs extends PropertyDefinitions public static final String KW_GCGRACESECONDS = "gc_grace_seconds"; public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold"; public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold"; - public static final String KW_REPLICATEONWRITE = "replicate_on_write"; public static final String KW_CACHING = "caching"; public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live"; public static final String KW_INDEX_INTERVAL = "index_interval"; @@ -57,7 +56,6 @@ public class CFPropDefs extends PropertyDefinitions keywords.add(KW_READREPAIRCHANCE); keywords.add(KW_DCLOCALREADREPAIRCHANCE); keywords.add(KW_GCGRACESECONDS); - keywords.add(KW_REPLICATEONWRITE); keywords.add(KW_CACHING); keywords.add(KW_DEFAULT_TIME_TO_LIVE); keywords.add(KW_INDEX_INTERVAL); @@ -67,6 +65,8 @@ public class CFPropDefs extends PropertyDefinitions keywords.add(KW_COMPACTION); keywords.add(KW_COMPRESSION); keywords.add(KW_MEMTABLE_FLUSH_PERIOD); + + obsoleteKeywords.add("replicate_on_write"); } private Class compactionStrategyClass = null; @@ -146,7 +146,6 @@ public class CFPropDefs extends PropertyDefinitions cfm.readRepairChance(getDouble(KW_READREPAIRCHANCE, cfm.getReadRepairChance())); cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair())); cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds())); - cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite())); int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold()); int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()); if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index c2a0080..ecbf4e1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -479,7 +479,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF throw new UnsupportedOperationException(); for (IMutation mutation : getMutations(Collections.emptyList(), true, null, queryState.getTimestamp(), false)) - mutation.apply(); + { + // We don't use counters internally. + assert mutation instanceof Mutation; + ((Mutation) mutation).apply(); + } return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java index 3e04f9b..d3cf085 100644 --- a/src/java/org/apache/cassandra/db/Cell.java +++ b/src/java/org/apache/cassandra/db/Cell.java @@ -234,17 +234,13 @@ public class Cell implements OnDiskAtom { if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; Cell cell = (Cell)o; - if (timestamp != cell.timestamp) - return false; - if (!name.equals(cell.name)) - return false; - - return value.equals(cell.value); + return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value); } @Override @@ -256,11 +252,6 @@ public class Cell implements OnDiskAtom return result; } - public Cell localCopy(ColumnFamilyStore cfs) - { - return localCopy(cfs, HeapAllocator.instance); - } - public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator) { return new Cell(name.copy(allocator), allocator.clone(value), timestamp); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ClockAndCount.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClockAndCount.java b/src/java/org/apache/cassandra/db/ClockAndCount.java new file mode 100644 index 0000000..1678c8c --- /dev/null +++ b/src/java/org/apache/cassandra/db/ClockAndCount.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import com.google.common.base.Objects; + +import org.apache.cassandra.cache.IMeasurableMemory; +import org.apache.cassandra.utils.ObjectSizes; + +public class ClockAndCount implements IMeasurableMemory +{ + public static ClockAndCount BLANK = ClockAndCount.create(0L, 0L); + + public final long clock; + public final long count; + + private ClockAndCount(long clock, long count) + { + this.clock = clock; + this.count = count; + } + + public static ClockAndCount create(long clock, long count) + { + return new ClockAndCount(clock, count); + } + + public long memorySize() + { + return ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(clock)) + + ObjectSizes.getFieldSize(TypeSizes.NATIVE.sizeof(count)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof ClockAndCount)) + return false; + + ClockAndCount other = (ClockAndCount) o; + return clock == other.clock && count == other.count; + } + + @Override + public int hashCode() + { + return Objects.hashCode(clock, count); + } + + @Override + public String toString() + { + return String.format("ClockAndCount(%s,%s)", clock, count); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index fe77d09..2237214 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.regex.Pattern; import javax.management.*; @@ -32,13 +33,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.*; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.Striped; import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.IRowCacheEntry; -import org.apache.cassandra.cache.RowCacheKey; -import org.apache.cassandra.cache.RowCacheSentinel; +import org.apache.cassandra.cache.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.config.*; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; @@ -46,6 +46,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.*; +import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.composites.Composite; @@ -105,6 +106,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final ColumnFamilyMetrics metric; public volatile long sampleLatencyNanos; + private final Striped counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 128); + public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. @@ -332,9 +335,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.unreferenceSSTables(); indexManager.invalidate(); - for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) - if (key.cfId == metadata.cfId) - invalidateCachedRow(key); + invalidateCaches(); + } + + /** + * Obtain a lock for this CF's part of a counter mutation + * @param key the key for the CounterMutation + * @return the striped lock instance + */ + public Lock counterLockFor(ByteBuffer key) + { + assert metadata.isCounter(); + return counterLocks.get(key); } /** @@ -562,13 +574,29 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this); if (cachedRowsRead > 0) - logger.info("completed loading ({} ms; {} keys) row cache for {}.{}", + logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), cachedRowsRead, keyspace.getName(), name); } + public void initCounterCache() + { + if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0) + return; + + long start = System.nanoTime(); + + int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this); + if (cachedShardsRead > 0) + logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), + cachedShardsRead, + keyspace.getName(), + name); + } + /** * See #{@code StorageService.loadNewSSTables(String, String)} for more info * @@ -1073,9 +1101,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return maxFile; } - public void forceCleanup(CounterId.OneShotRenewer renewer) throws ExecutionException, InterruptedException + public void forceCleanup() throws ExecutionException, InterruptedException { - CompactionManager.instance.performCleanup(ColumnFamilyStore.this, renewer); + CompactionManager.instance.performCleanup(ColumnFamilyStore.this); } public void scrub(boolean disableSnapshot) throws ExecutionException, InterruptedException @@ -1535,6 +1563,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges)) invalidateCachedRow(dk); } + + if (metadata.isCounter()) + { + for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet()) + { + DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); + if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges)) + CacheService.instance.counterCache.remove(key); + } + } } public static abstract class AbstractScanIterator extends AbstractIterator implements CloseableIterator @@ -1886,6 +1924,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily) cached; } + private void invalidateCaches() + { + for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + if (key.cfId == metadata.cfId) + invalidateCachedRow(key); + + if (metadata.isCounter()) + for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet()) + if (key.cfId == metadata.cfId) + CacheService.instance.counterCache.remove(key); + } + /** * @return true if @param key is contained in the row cache */ @@ -1908,6 +1958,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean invalidateCachedRow(new RowCacheKey(cfId, key)); } + public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName) + { + if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. + return null; + return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, cellName)); + } + + public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount) + { + if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. + return; + CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, cellName), clockAndCount); + } + public void forceMajorCompaction() throws InterruptedException, ExecutionException { CompactionManager.instance.performMaximal(this); @@ -2017,12 +2081,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); - logger.debug("cleaning out row cache"); - for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) - { - if (key.cfId == metadata.cfId) - invalidateCachedRow(key); - } + invalidateCaches(); } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 6d04314..c97e71f 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -341,17 +341,10 @@ public enum ConsistencyLevel public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException { if (this == ConsistencyLevel.ANY) - { throw new InvalidRequestException("Consistency level ANY is not yet supported for counter columnfamily " + metadata.cfName); - } - else if (!metadata.getReplicateOnWrite() && !(this == ConsistencyLevel.ONE || this == ConsistencyLevel.LOCAL_ONE)) - { - throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName); - } - else if (isSerialConsistency()) - { + + if (isSerialConsistency()) throw new InvalidRequestException("Counter operations are inherently non-serializable"); - } } private void requireNetworkTopologyStrategy(String keyspaceName) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java index 76949d4..426d876 100644 --- a/src/java/org/apache/cassandra/db/CounterCell.java +++ b/src/java/org/apache/cassandra/db/CounterCell.java @@ -25,7 +25,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.context.IContext.ContextRelationship; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.Allocator; @@ -40,16 +39,6 @@ public class CounterCell extends Cell private final long timestampOfLastDelete; - public CounterCell(CellName name, long value, long timestamp) - { - this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp); - } - - public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete) - { - this(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete); - } - public CounterCell(CellName name, ByteBuffer value, long timestamp) { this(name, value, timestamp, Long.MIN_VALUE); @@ -68,6 +57,12 @@ public class CounterCell extends Cell return new CounterCell(name, value, timestamp, timestampOfLastDelete); } + // For use by tests of compatibility with pre-2.1 counter only. + public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete) + { + return new CounterCell(name, contextManager.createLocal(value, HeapAllocator.instance), timestamp, timestampOfLastDelete); + } + @Override public Cell withUpdatedName(CellName newName) { @@ -110,12 +105,12 @@ public class CounterCell extends Cell // merging a CounterCell with a tombstone never return a tombstone // unless that tombstone timestamp is greater that the CounterCell // one. - assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass(); + assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass(); if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete()) return cell; - ContextRelationship rel = contextManager.diff(cell.value(), value()); - if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel) + CounterContext.Relationship rel = contextManager.diff(cell.value(), value()); + if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) return cell; return null; } @@ -195,12 +190,6 @@ public class CounterCell extends Cell } @Override - public Cell localCopy(ColumnFamilyStore cfs) - { - return localCopy(cfs, HeapAllocator.instance); - } - - @Override public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator) { return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete); @@ -231,14 +220,6 @@ public class CounterCell extends Cell contextManager.validateContext(value()); } - /** - * Check if a given counterId is found in this CounterCell context. - */ - public boolean hasCounterId(CounterId id) - { - return contextManager.hasCounterId(value(), id); - } - public Cell markLocalToBeCleared() { ByteBuffer marked = contextManager.markLocalToBeCleared(value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 7dcb05c..6884c80 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -21,22 +21,21 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; - -import com.google.common.collect.Iterables; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HeapAllocator; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.*; public class CounterMutation implements IMutation { @@ -76,67 +75,175 @@ public class CounterMutation implements IMutation return consistency; } - public Mutation makeReplicationMutation() + public MessageOut makeMutationMessage() + { + return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer); + } + + /** + * Applies the counter mutation, returns the result Mutation (for replication to other nodes). + * + * 1. Grabs the striped CF-level lock(s) + * 2. Gets the current values of the counters-to-be-modified from the counter cache + * 3. Reads the rest of the current values (cache misses) from the CF + * 4. Writes the updated counter values + * 5. Updates the counter cache + * 6. Releases the lock(s) + * + * See CASSANDRA-4775 and CASSANDRA-6504 for further details. + * + * @return the applied resulting Mutation + */ + public Mutation apply() throws WriteTimeoutException { - List readCommands = new LinkedList<>(); - long timestamp = System.currentTimeMillis(); - for (ColumnFamily columnFamily : mutation.getColumnFamilies()) + Mutation result = new Mutation(getKeyspaceName(), ByteBufferUtil.clone(key())); + Keyspace keyspace = Keyspace.open(getKeyspaceName()); + + ArrayList cfIds = new ArrayList<>(getColumnFamilyIds()); + Collections.sort(cfIds); // will lock in the sorted order, to avoid a potential deadlock. + ArrayList locks = new ArrayList<>(cfIds.size()); + try { - if (!columnFamily.metadata().getReplicateOnWrite()) - continue; - addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands); + Tracing.trace("Acquiring {} counter locks", cfIds.size()); + for (UUID cfId : cfIds) + { + Lock lock = keyspace.getColumnFamilyStore(cfId).counterLockFor(key()); + if (!lock.tryLock(getTimeout(), TimeUnit.MILLISECONDS)) + throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); + locks.add(lock); + } + + for (ColumnFamily cf : getColumnFamilies()) + result.add(processModifications(cf)); + + result.apply(); + updateCounterCache(result, keyspace); + return result; + } + catch (InterruptedException e) + { + throw new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(keyspace)); + } + finally + { + for (Lock lock : locks) + lock.unlock(); } + } + + // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s + private ColumnFamily processModifications(ColumnFamily changesCF) + { + Allocator allocator = HeapAllocator.instance; + ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id()); - // create a replication Mutation - Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); - for (ReadCommand readCommand : readCommands) + ColumnFamily resultCF = changesCF.cloneMeShallow(); + + List counterUpdateCells = new ArrayList<>(changesCF.getColumnCount()); + for (Cell cell : changesCF) { - Keyspace keyspace = Keyspace.open(readCommand.ksName); - Row row = readCommand.getRow(keyspace); - if (row == null || row.cf == null) - continue; + if (cell instanceof CounterUpdateCell) + counterUpdateCells.add((CounterUpdateCell)cell); + else + resultCF.addColumn(cell.localCopy(cfs, allocator)); + } - ColumnFamily cf = row.cf; - replicationMutation.add(cf); + if (counterUpdateCells.isEmpty()) + return resultCF; // only DELETEs + + ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs); + for (int i = 0; i < counterUpdateCells.size(); i++) + { + ClockAndCount currentValue = currentValues[i]; + CounterUpdateCell update = counterUpdateCells.get(i); + + long clock = currentValue.clock + 1L; + long count = currentValue.count + update.delta(); + + resultCF.addColumn(new CounterCell(update.name().copy(allocator), + CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count, allocator), + update.timestamp())); } - return replicationMutation; + + return resultCF; } - private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key, ColumnFamily columnFamily, long timestamp, List commands) + // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs. + private ClockAndCount[] getCurrentValues(List counterUpdateCells, ColumnFamilyStore cfs) { - SortedSet s = new TreeSet<>(columnFamily.metadata().comparator); - Iterables.addAll(s, columnFamily.getColumnNames()); - commands.add(new SliceByNamesReadCommand(keyspaceName, key, columnFamily.metadata().cfName, timestamp, new NamesQueryFilter(s))); + ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()]; + int remaining = counterUpdateCells.size(); + + if (CacheService.instance.counterCache.getCapacity() != 0) + { + Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size()); + remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues); + if (remaining == 0) + return currentValues; + } + + Tracing.trace("Reading {} counter values from the CF", remaining); + getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues); + + return currentValues; } - public MessageOut makeMutationMessage() + // Returns the count of cache misses. + private int getCurrentValuesFromCache(List counterUpdateCells, + ColumnFamilyStore cfs, + ClockAndCount[] currentValues) { - return new MessageOut<>(MessagingService.Verb.COUNTER_MUTATION, this, serializer); + int cacheMisses = 0; + for (int i = 0; i < counterUpdateCells.size(); i++) + { + ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name()); + if (cached != null) + currentValues[i] = cached; + else + cacheMisses++; + } + return cacheMisses; } - public boolean shouldReplicateOnWrite() + // Reads the missing current values from the CFS. + private void getCurrentValuesFromCFS(List counterUpdateCells, + ColumnFamilyStore cfs, + ClockAndCount[] currentValues) { - for (ColumnFamily cf : mutation.getColumnFamilies()) - if (cf.metadata().getReplicateOnWrite()) - return true; - return false; + SortedSet names = new TreeSet<>(cfs.metadata.comparator); + for (int i = 0; i < currentValues.length; i++) + if (currentValues[i] == null) + names.add(counterUpdateCells.get(i).name); + + ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names)); + Row row = cmd.getRow(cfs.keyspace); + ColumnFamily cf = row == null ? null : row.cf; + + for (int i = 0; i < currentValues.length; i++) + { + if (currentValues[i] != null) + continue; + + Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name()); + if (cell == null || cell.isMarkedForDelete(Long.MIN_VALUE)) // absent or a tombstone. + currentValues[i] = ClockAndCount.BLANK; + else + currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value()); + } } - public void apply() + private void updateCounterCache(Mutation applied, Keyspace keyspace) { - // transform all CounterUpdateCell to CounterCell: accomplished by localCopy - Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key())); - Keyspace keyspace = Keyspace.open(m.getKeyspaceName()); + if (CacheService.instance.counterCache.getCapacity() == 0) + return; - for (ColumnFamily cf_ : mutation.getColumnFamilies()) + for (ColumnFamily cf : applied.getColumnFamilies()) { - ColumnFamily cf = cf_.cloneMeShallow(); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id()); - for (Cell cell : cf_) - cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance); - m.add(cf); + for (Cell cell : cf) + if (cell instanceof CounterCell) + cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value())); } - m.apply(); } public void addAll(IMutation m) @@ -147,6 +254,11 @@ public class CounterMutation implements IMutation mutation.addAll(cm.mutation); } + public long getTimeout() + { + return DatabaseDescriptor.getCounterWriteRpcTimeout(); + } + @Override public String toString() { @@ -176,7 +288,7 @@ public class CounterMutation implements IMutation public long serializedSize(CounterMutation cm, int version) { return Mutation.serializer.serializedSize(cm.mutation, version) - + TypeSizes.NATIVE.sizeof(cm.consistency.name()); + + TypeSizes.NATIVE.sizeof(cm.consistency.name()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/CounterUpdateCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java index dd2bf2a..f7a0ef1 100644 --- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java +++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java @@ -21,10 +21,8 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.utils.Allocator; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HeapAllocator; /** * A counter update while it hasn't been applied yet by the leader replica. @@ -63,13 +61,12 @@ public class CounterUpdateCell extends Cell // The only time this could happen is if a batchAdd ships two // increment for the same cell. Hence we simply sums the delta. - assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type."; - // tombstones take precedence if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant return timestamp() > cell.timestamp() ? this : cell; // neither is tombstoned + assert cell instanceof CounterUpdateCell : "Wrong class type."; CounterUpdateCell c = (CounterUpdateCell) cell; return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp())); } @@ -81,21 +78,9 @@ public class CounterUpdateCell extends Cell } @Override - public CounterCell localCopy(ColumnFamilyStore cfs) - { - return new CounterCell(name.copy(HeapAllocator.instance), - CounterContext.instance().createLocal(delta(), HeapAllocator.instance), - timestamp(), - Long.MIN_VALUE); - } - - @Override public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator) { - return new CounterCell(name.copy(allocator), - CounterContext.instance().createLocal(delta(), allocator), - timestamp(), - Long.MIN_VALUE); + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/DeletedCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java index 5b89e1d..13d1358 100644 --- a/src/java/org/apache/cassandra/db/DeletedCell.java +++ b/src/java/org/apache/cassandra/db/DeletedCell.java @@ -27,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.Allocator; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.HeapAllocator; public class DeletedCell extends Cell { @@ -98,12 +97,6 @@ public class DeletedCell extends Cell } @Override - public Cell localCopy(ColumnFamilyStore cfs) - { - return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp); - } - - @Override public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator) { return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/ExpiringCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java index b15514e..a2f68da 100644 --- a/src/java/org/apache/cassandra/db/ExpiringCell.java +++ b/src/java/org/apache/cassandra/db/ExpiringCell.java @@ -27,7 +27,6 @@ import org.apache.cassandra.db.composites.CellNameType; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.Allocator; -import org.apache.cassandra.utils.HeapAllocator; /** * Alternative to Cell that have an expiring time. @@ -133,12 +132,6 @@ public class ExpiringCell extends Cell } @Override - public Cell localCopy(ColumnFamilyStore cfs) - { - return localCopy(cfs, HeapAllocator.instance); - } - - @Override public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator) { return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/IMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java index 70bd79c..44df104 100644 --- a/src/java/org/apache/cassandra/db/IMutation.java +++ b/src/java/org/apache/cassandra/db/IMutation.java @@ -26,7 +26,7 @@ public interface IMutation public String getKeyspaceName(); public Collection getColumnFamilyIds(); public ByteBuffer key(); - public void apply(); + public long getTimeout(); public String toString(boolean shallow); public void addAll(IMutation m); public Collection getColumnFamilies(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index d70d7f9..31d9503 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -26,6 +26,7 @@ import java.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.Composite; @@ -219,6 +220,11 @@ public class Mutation implements IMutation return new MessageOut<>(verb, this, serializer); } + public long getTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout(); + } + public String toString() { return toString(false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 39bdd15..f8637c1 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -693,27 +693,6 @@ public class SystemKeyspace forceBlockingFlush(COUNTER_ID_CF); } - public static List getOldLocalCounterIds() - { - List l = new ArrayList(); - - Keyspace keyspace = Keyspace.open(Keyspace.SYSTEM_KS); - QueryFilter filter = QueryFilter.getIdentityFilter(decorate(ALL_LOCAL_NODE_ID_KEY), COUNTER_ID_CF, System.currentTimeMillis()); - ColumnFamily cf = keyspace.getColumnFamilyStore(COUNTER_ID_CF).getColumnFamily(filter); - - CounterId previous = null; - for (Cell c : cf) - { - if (previous != null) - l.add(new CounterId.CounterIdRecord(previous, c.timestamp())); - - // this will ignore the last column on purpose since it is the - // current local node id - previous = CounterId.wrap(c.name().toByteBuffer()); - } - return l; - } - /** * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) * @return CFS responsible to hold low-level serialized schema http://git-wip-us.apache.org/repos/asf/cassandra/blob/714c4233/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2a8d68d..1414c3f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -261,7 +261,7 @@ public class CompactionManager implements CompactionManagerMBean }); } - public void performCleanup(ColumnFamilyStore cfStore, final CounterId.OneShotRenewer renewer) throws InterruptedException, ExecutionException + public void performCleanup(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException { performAllSSTableOperation(cfStore, new AllSSTablesOperation() { @@ -272,7 +272,7 @@ public class CompactionManager implements CompactionManagerMBean List sortedSSTables = Lists.newArrayList(sstables); Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); - doCleanupCompaction(store, sortedSSTables, renewer); + doCleanupCompaction(store, sortedSSTables); } }); } @@ -508,7 +508,7 @@ public class CompactionManager implements CompactionManagerMBean * * @throws IOException */ - private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection sstables, CounterId.OneShotRenewer renewer) throws IOException + private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection sstables) throws IOException { assert !cfs.isIndex(); Keyspace keyspace = cfs.keyspace; @@ -520,7 +520,7 @@ public class CompactionManager implements CompactionManagerMBean } boolean hasIndexes = cfs.indexManager.hasIndexes(); - CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges, renewer); + CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges); for (SSTableReader sstable : sstables) { @@ -614,12 +614,11 @@ public class CompactionManager implements CompactionManagerMBean private static abstract class CleanupStrategy { - public static CleanupStrategy get(ColumnFamilyStore cfs, Collection> ranges, CounterId.OneShotRenewer renewer) + public static CleanupStrategy get(ColumnFamilyStore cfs, Collection> ranges) { - if (cfs.indexManager.hasIndexes() || cfs.metadata.isCounter()) - return new Full(cfs, ranges, renewer); - - return new Bounded(cfs, ranges); + return cfs.indexManager.hasIndexes() + ? new Full(cfs, ranges) + : new Bounded(cfs, ranges); } public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter); @@ -660,14 +659,12 @@ public class CompactionManager implements CompactionManagerMBean private final Collection> ranges; private final ColumnFamilyStore cfs; private List indexedColumnsInRow; - private final CounterId.OneShotRenewer renewer; - public Full(ColumnFamilyStore cfs, Collection> ranges, CounterId.OneShotRenewer renewer) + public Full(ColumnFamilyStore cfs, Collection> ranges) { this.cfs = cfs; this.ranges = ranges; this.indexedColumnsInRow = null; - this.renewer = renewer; } @Override @@ -690,8 +687,6 @@ public class CompactionManager implements CompactionManagerMBean while (row.hasNext()) { OnDiskAtom column = row.next(); - if (column instanceof CounterCell) - renewer.maybeRenew((CounterCell) column); if (column instanceof Cell && cfs.indexManager.indexes((Cell) column)) {