Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0B5219B67 for ; Thu, 10 Mar 2016 15:26:30 +0000 (UTC) Received: (qmail 84353 invoked by uid 500); 10 Mar 2016 15:26:30 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 84285 invoked by uid 500); 10 Mar 2016 15:26:30 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 83923 invoked by uid 99); 10 Mar 2016 15:26:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2016 15:26:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B5AEDFA0B; Thu, 10 Mar 2016 15:26:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jake@apache.org To: commits@cassandra.apache.org Date: Thu, 10 Mar 2016 15:26:34 -0000 Message-Id: <5f1e2fdbcf1c43e0a88041d8462e79a7@git.apache.org> In-Reply-To: <2fa9cb36b54b4602a6845a47cadf0cee@git.apache.org> References: <2fa9cb36b54b4602a6845a47cadf0cee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/8] cassandra git commit: Establish and implement canonical bulk reading workload(s) Establish and implement canonical bulk reading workload(s) patch by Stefania Alborghetti; reviewed by Jake Luciani for CASSANDRA-10331 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f27ab290 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f27ab290 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f27ab290 Branch: refs/heads/trunk Commit: f27ab2908a06056b332e3a87008f0f8560a9620b Parents: 9fd8dfc Author: Stefania Alborghetti Authored: Wed Mar 2 17:40:20 2016 +0800 Committer: T Jake Luciani Committed: Thu Mar 10 10:23:51 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + tools/cqlstress-example.yaml | 13 + tools/stress/README.txt | 5 + .../org/apache/cassandra/stress/Operation.java | 91 +------ .../apache/cassandra/stress/StressAction.java | 38 ++- .../apache/cassandra/stress/StressMetrics.java | 2 +- .../apache/cassandra/stress/StressProfile.java | 62 ++++- .../org/apache/cassandra/stress/StressYaml.java | 8 + .../apache/cassandra/stress/WorkManager.java | 2 +- .../stress/generate/TokenRangeIterator.java | 70 +++++ .../operations/OpDistributionFactory.java | 2 +- .../stress/operations/PartitionOperation.java | 130 +++++++++ .../SampledOpDistributionFactory.java | 16 +- .../predefined/PredefinedOperation.java | 3 +- .../operations/userdefined/SchemaStatement.java | 3 +- .../operations/userdefined/TokenRangeQuery.java | 270 +++++++++++++++++++ .../userdefined/ValidatingSchemaQuery.java | 3 +- .../cassandra/stress/settings/CliOption.java | 3 +- .../settings/SettingsCommandPreDefined.java | 2 +- .../SettingsCommandPreDefinedMixed.java | 2 +- .../stress/settings/SettingsCommandUser.java | 14 +- .../stress/settings/SettingsTokenRange.java | 77 ++++++ .../stress/settings/StressSettings.java | 8 +- 23 files changed, 701 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 58c7ed0..68f5714 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.5 Merged from 3.0: + * Establish and implement canonical bulk reading workload(s) (CASSANDRA-10331) * Fix paging for IN queries on tables without clustering columns (CASSANDRA-11208) * Remove recursive call from CompositesSearcher (CASSANDRA-11304) * Fix filtering on non-primary key columns for queries without index (CASSANDRA-6377) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/cqlstress-example.yaml ---------------------------------------------------------------------- diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml index 3c60c32..835a4cb 100644 --- a/tools/cqlstress-example.yaml +++ b/tools/cqlstress-example.yaml @@ -93,3 +93,16 @@ queries: range1: cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100 fields: multirow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition) + + +# +# A list of bulk read queries that analytics tools may perform against the schema +# Each query will sweep an entire token range, page by page. +# +token_range_queries: + all_columns_tr_query: + columns: '*' + page_size: 5000 + + value_tr_query: + columns: value http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/README.txt ---------------------------------------------------------------------- diff --git a/tools/stress/README.txt b/tools/stress/README.txt index 0046b25..aa89dab 100644 --- a/tools/stress/README.txt +++ b/tools/stress/README.txt @@ -72,6 +72,11 @@ Primary Options: The port to connect to cassandra nodes on -sendto: Specify a stress server to send this command to + -graph: + Graph recorded metrics + -tokenrange: + Token range settings + Suboptions: Every command and primary option has its own collection of suboptions. These are too numerous to list here. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index 139dd53..0f13d3c 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -19,13 +19,9 @@ package org.apache.cassandra.stress; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import com.google.common.util.concurrent.RateLimiter; -import org.apache.cassandra.stress.generate.*; -import org.apache.cassandra.stress.settings.OptionRatioDistribution; import org.apache.cassandra.stress.settings.SettingsLog; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; @@ -38,45 +34,11 @@ public abstract class Operation { public final StressSettings settings; public final Timer timer; - protected final DataSpec spec; - private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get(); - private final List partitionCache = new ArrayList<>(); - protected List partitions; - - public static final class DataSpec - { - public final PartitionGenerator partitionGenerator; - final SeedManager seedManager; - final Distribution partitionCount; - final RatioDistribution useRatio; - final RatioDistribution rowPopulationRatio; - final Integer targetCount; - - public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount) - { - this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount); - } - public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio) - { - this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null); - } - private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount) - { - this.partitionGenerator = partitionGenerator; - this.seedManager = seedManager; - this.partitionCount = partitionCount; - this.useRatio = useRatio; - this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio; - this.targetCount = targetCount; - } - } - - public Operation(Timer timer, StressSettings settings, DataSpec spec) + public Operation(Timer timer, StressSettings settings) { this.timer = timer; this.settings = settings; - this.spec = spec; } public static interface RunOp @@ -86,48 +48,7 @@ public abstract class Operation public int rowCount(); } - boolean ready(WorkManager permits, RateLimiter rateLimiter) - { - int partitionCount = (int) spec.partitionCount.next(); - if (partitionCount <= 0) - return false; - partitionCount = permits.takePermits(partitionCount); - if (partitionCount <= 0) - return false; - - int i = 0; - boolean success = true; - for (; i < partitionCount && success ; i++) - { - if (i >= partitionCache.size()) - partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager)); - - success = false; - while (!success) - { - Seed seed = spec.seedManager.next(this); - if (seed == null) - break; - - success = reset(seed, partitionCache.get(i)); - } - } - partitionCount = i; - - if (rateLimiter != null) - rateLimiter.acquire(partitionCount); - - partitions = partitionCache.subList(0, partitionCount); - return !partitions.isEmpty(); - } - - protected boolean reset(Seed seed, PartitionIterator iterator) - { - if (spec.useRatio == null) - return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite()); - else - return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite()); - } + public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter); public boolean isWrite() { @@ -202,13 +123,7 @@ public abstract class Operation } - private String key() - { - List keys = new ArrayList<>(); - for (PartitionIterator partition : partitions) - keys.add(partition.getKeyAsString()); - return keys.toString(); - } + public abstract String key(); protected String getExceptionMessage(Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 3150e14..ebe6270 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -71,7 +71,7 @@ public class StressAction implements Runnable success = runMulti(settings.rate.auto, rateLimiter); else success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count, - settings.command.duration, rateLimiter, settings.command.durationUnits, output); + settings.command.duration, rateLimiter, settings.command.durationUnits, output, false); if (success) output.println("END"); @@ -84,12 +84,12 @@ public class StressAction implements Runnable // type provided separately to support recursive call for mixed command with each command type it is performing private void warmup(OpDistributionFactory operations) { - // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } ); - int iterations = 50000 * settings.node.nodes.size(); + // do 25% of iterations as warmup but no more than 50k (by default hotspot compiles methods after 10k invocations) + int iterations = (settings.command.count > 0 + ? Math.min(50000, (int)(settings.command.count * 0.25)) + : 50000) * settings.node.nodes.size(); int threads = 100; - if (iterations > settings.command.count && settings.command.count > 0) - return; if (settings.rate.maxThreads > 0) threads = Math.min(threads, settings.rate.maxThreads); @@ -101,7 +101,7 @@ public class StressAction implements Runnable // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance; // so warm up all the nodes we're speaking to only. output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations)); - run(single, threads, iterations, 0, null, null, warmupOutput); + run(single, threads, iterations, 0, null, null, warmupOutput, true); } } @@ -124,7 +124,7 @@ public class StressAction implements Runnable settings.command.truncateTables(settings); StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count, - settings.command.duration, rateLimiter, settings.command.durationUnits, output); + settings.command.duration, rateLimiter, settings.command.durationUnits, output, false); if (result == null) return false; results.add(result); @@ -181,7 +181,14 @@ public class StressAction implements Runnable return improvement / count; } - private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output) + private StressMetrics run(OpDistributionFactory operations, + int threadCount, + long opCount, + long duration, + RateLimiter rateLimiter, + TimeUnit durationUnits, + PrintStream output, + boolean isWarmup) { output.println(String.format("Running %s with %d threads %s", operations.desc(), @@ -199,10 +206,12 @@ public class StressAction implements Runnable final CountDownLatch done = new CountDownLatch(threadCount); final Consumer[] consumers = new Consumer[threadCount]; + int sampleCount = settings.samples.liveCount / threadCount; for (int i = 0; i < threadCount; i++) { - consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter, - settings.samples.liveCount / threadCount); + + consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup), + done, workManager, metrics, rateLimiter); } // starting worker threadCount @@ -259,14 +268,17 @@ public class StressAction implements Runnable private final WorkManager workManager; private final CountDownLatch done; - public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics, - RateLimiter rateLimiter, int sampleCount) + public Consumer(OpDistribution operations, + CountDownLatch done, + WorkManager workManager, + StressMetrics metrics, + RateLimiter rateLimiter) { this.done = done; this.rateLimiter = rateLimiter; this.workManager = workManager; this.metrics = metrics; - this.operations = operations.get(metrics.getTiming(), sampleCount); + this.operations = operations; } public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java index 0190ed8..3585a00 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java @@ -158,7 +158,7 @@ public class StressMetrics TimingInterval current = result.intervals.combine(settings.samples.reportCount); TimingInterval history = timing.getHistory().combine(settings.samples.historyCount); rowRateUncertainty.update(current.adjustedRowRate()); - if (current.partitionCount != 0) + if (current.operationCount != 0) { if (result.intervals.intervals().size() > 1) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 297a004..5243d96 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -41,6 +41,7 @@ import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.generate.values.*; +import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.operations.userdefined.SchemaQuery; import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery; @@ -66,9 +67,11 @@ public class StressProfile implements Serializable public String tableName; private Map columnConfigs; private Map queries; + public Map tokenRangeQueries; private Map insert; transient volatile TableMetadata tableMetaData; + transient volatile Set tokenRanges; transient volatile GeneratorFactory generatorFactory; @@ -92,6 +95,7 @@ public class StressProfile implements Serializable tableCql = yaml.table_definition; seedStr = "seed for stress"; queries = yaml.queries; + tokenRangeQueries = yaml.token_range_queries; insert = yaml.insert; extraSchemaDefinitions = yaml.extra_definitions; @@ -100,6 +104,10 @@ public class StressProfile implements Serializable assert tableName != null : "table name is required in yaml file"; assert queries != null : "queries map is required in yaml file"; + for (String query : queries.keySet()) + assert !tokenRangeQueries.containsKey(query) : + String.format("Found %s in both queries and token_range_queries, please use different names", query); + if (keyspaceCql != null && keyspaceCql.length() > 0) { try @@ -254,8 +262,48 @@ public class StressProfile implements Serializable } } - public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings) + public Set maybeLoadTokenRanges(StressSettings settings) { + maybeLoadSchemaInfo(settings); // ensure table metadata is available + + JavaDriverClient client = settings.getJavaDriverClient(); + synchronized (client) + { + if (tokenRanges != null) + return tokenRanges; + + Cluster cluster = client.getCluster(); + Metadata metadata = cluster.getMetadata(); + if (metadata == null) + throw new RuntimeException("Unable to get metadata"); + + List sortedRanges = new ArrayList<>(metadata.getTokenRanges().size() + 1); + for (TokenRange range : metadata.getTokenRanges()) + { + //if we don't unwrap we miss the partitions between ring min and smallest range start value + if (range.isWrappedAround()) + sortedRanges.addAll(range.unwrap()); + else + sortedRanges.add(range); + } + + Collections.sort(sortedRanges); + tokenRanges = new LinkedHashSet<>(sortedRanges); + return tokenRanges; + } + } + + public Operation getQuery(String name, + Timer timer, + PartitionGenerator generator, + SeedManager seeds, + StressSettings settings, + boolean isWarmup) + { + name = name.toLowerCase(); + if (!queries.containsKey(name)) + throw new IllegalArgumentException("No query defined with name " + name); + if (queryStatements == null) { synchronized (this) @@ -296,13 +344,19 @@ public class StressProfile implements Serializable } } - name = name.toLowerCase(); - if (!queryStatements.containsKey(name)) - throw new IllegalArgumentException("No query defined with name " + name); return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), argSelects.get(name)); } + public Operation getBulkReadQueries(String name, Timer timer, StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean isWarmup) + { + StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name); + if (def == null) + throw new IllegalArgumentException("No bulk read query defined with name " + name); + + return new TokenRangeQuery(timer, settings, tableMetaData, tokenRangeIterator, def, isWarmup); + } + public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { if (insertStatement == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/StressYaml.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java index 90797b4..214e56a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java @@ -20,6 +20,7 @@ */ package org.apache.cassandra.stress; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ public class StressYaml public List> columnspec; public Map queries; public Map insert; + public Map token_range_queries = new HashMap<>(); public static class QueryDef { @@ -42,4 +44,10 @@ public class StressYaml public String fields; } + public static class TokenRangeQueryDef + { + public String columns; + public int page_size = 5000; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/WorkManager.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java index c6a3eee..78d4176 100644 --- a/tools/stress/src/org/apache/cassandra/stress/WorkManager.java +++ b/tools/stress/src/org/apache/cassandra/stress/WorkManager.java @@ -2,7 +2,7 @@ package org.apache.cassandra.stress; import java.util.concurrent.atomic.AtomicLong; -interface WorkManager +public interface WorkManager { // -1 indicates consumer should terminate int takePermits(int count); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java new file mode 100644 index 0000000..5ddac61 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/generate/TokenRangeIterator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.stress.generate; + +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.datastax.driver.core.TokenRange; +import org.apache.cassandra.stress.settings.StressSettings; + +public class TokenRangeIterator +{ + private final Set tokenRanges; + private final ConcurrentLinkedQueue pendingRanges; + private final boolean wrap; + + public TokenRangeIterator(StressSettings settings, Set tokenRanges) + { + this.tokenRanges = maybeSplitRanges(tokenRanges, settings.tokenRange.splitFactor); + this.pendingRanges = new ConcurrentLinkedQueue<>(this.tokenRanges); + this.wrap = settings.tokenRange.wrap; + } + + private static Set maybeSplitRanges(Set tokenRanges, int splitFactor) + { + if (splitFactor <= 1) + return tokenRanges; + + Set ret = new TreeSet<>(); + for (TokenRange range : tokenRanges) + ret.addAll(range.splitEvenly(splitFactor)); + + return ret; + } + + public void update() + { + // we may race and add to the queue twice but no bad consequence so it's fine if that happens + // as ultimately only the permits determine when to stop if wrap is true + if (wrap && pendingRanges.isEmpty()) + pendingRanges.addAll(tokenRanges); + } + + public TokenRange next() + { + return pendingRanges.poll(); + } + + public boolean exhausted() + { + return pendingRanges.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java index 7e13fcd..5fbb0f9 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java @@ -25,7 +25,7 @@ import org.apache.cassandra.stress.util.Timing; public interface OpDistributionFactory { - public OpDistribution get(Timing timing, int sampleCount); + public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup); public String desc(); Iterable each(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java new file mode 100644 index 0000000..45c36f2 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.stress.operations; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.stress.Operation; +import org.apache.cassandra.stress.WorkManager; +import org.apache.cassandra.stress.generate.Distribution; +import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.PartitionIterator; +import org.apache.cassandra.stress.generate.RatioDistribution; +import org.apache.cassandra.stress.generate.Seed; +import org.apache.cassandra.stress.generate.SeedManager; +import org.apache.cassandra.stress.settings.OptionRatioDistribution; +import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.stress.util.Timer; + +public abstract class PartitionOperation extends Operation +{ + protected final DataSpec spec; + private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get(); + + private final List partitionCache = new ArrayList<>(); + protected List partitions; + + public static final class DataSpec + { + public final PartitionGenerator partitionGenerator; + final SeedManager seedManager; + final Distribution partitionCount; + final RatioDistribution useRatio; + final RatioDistribution rowPopulationRatio; + final Integer targetCount; + + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount) + { + this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount); + } + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio) + { + this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null); + } + private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount) + { + this.partitionGenerator = partitionGenerator; + this.seedManager = seedManager; + this.partitionCount = partitionCount; + this.useRatio = useRatio; + this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio; + this.targetCount = targetCount; + } + } + + public PartitionOperation(Timer timer, StressSettings settings, DataSpec spec) + { + super(timer, settings); + this.spec = spec; + } + + public boolean ready(WorkManager permits, RateLimiter rateLimiter) + { + int partitionCount = (int) spec.partitionCount.next(); + if (partitionCount <= 0) + return false; + partitionCount = permits.takePermits(partitionCount); + if (partitionCount <= 0) + return false; + + int i = 0; + boolean success = true; + for (; i < partitionCount && success ; i++) + { + if (i >= partitionCache.size()) + partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager)); + + success = false; + while (!success) + { + Seed seed = spec.seedManager.next(this); + if (seed == null) + break; + + success = reset(seed, partitionCache.get(i)); + } + } + partitionCount = i; + + if (rateLimiter != null) + rateLimiter.acquire(partitionCount); + + partitions = partitionCache.subList(0, partitionCount); + return !partitions.isEmpty(); + } + + protected boolean reset(Seed seed, PartitionIterator iterator) + { + if (spec.useRatio == null) + return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite()); + else + return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite()); + } + + public String key() + { + List keys = new ArrayList<>(); + for (PartitionIterator partition : partitions) + keys.add(partition.getKeyAsString()); + return keys.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java index 194f84f..a10585d 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java @@ -44,16 +44,17 @@ public abstract class SampledOpDistributionFactory implements OpDistributionF this.clustering = clustering; } - protected abstract List get(Timer timer, PartitionGenerator generator, T key); + protected abstract List get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup); protected abstract PartitionGenerator newGenerator(); - public OpDistribution get(Timing timing, int sampleCount) + public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup) { PartitionGenerator generator = newGenerator(); List> operations = new ArrayList<>(); for (Map.Entry ratio : ratios.entrySet()) { - List ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey()); + List ops = get(timing.newTimer(ratio.getKey().toString(), sampleCount), + generator, ratio.getKey(), isWarmup); for (Operation op : ops) operations.add(new Pair<>(op, ratio.getValue() / ops.size())); } @@ -75,15 +76,18 @@ public abstract class SampledOpDistributionFactory implements OpDistributionF { out.add(new OpDistributionFactory() { - public OpDistribution get(Timing timing, int sampleCount) + public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup) { - List ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey()); + List ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), + newGenerator(), + ratio.getKey(), + isWarmup); if (ops.size() == 1) return new FixedOpDistribution(ops.get(0)); List> ratios = new ArrayList<>(); for (Operation op : ops) ratios.add(new Pair<>(op, 1d / ops.size())); - return new SampledOpDistribution(new EnumeratedDistribution(ratios), new DistributionFixed(1)); + return new SampledOpDistribution(new EnumeratedDistribution<>(ratios), new DistributionFixed(1)); } public String desc() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java index b435abb..1f9a2c8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.*; +import org.apache.cassandra.stress.operations.PartitionOperation; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.CqlVersion; import org.apache.cassandra.stress.settings.StressSettings; @@ -32,7 +33,7 @@ import org.apache.cassandra.stress.util.Timer; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; -public abstract class PredefinedOperation extends Operation +public abstract class PredefinedOperation extends PartitionOperation { public static final byte[] EMPTY_BYTE_ARRAY = {}; public final Command type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java index 1c88490..166d689 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java @@ -32,11 +32,12 @@ import com.datastax.driver.core.PreparedStatement; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.Row; +import org.apache.cassandra.stress.operations.PartitionOperation; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.Timer; -public abstract class SchemaStatement extends Operation +public abstract class SchemaStatement extends PartitionOperation { final PreparedStatement statement; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java new file mode 100644 index 0000000..60a6c48 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.stress.operations.userdefined; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.naming.OperationNotSupportedException; + +import com.google.common.util.concurrent.RateLimiter; + +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.PagingState; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.Token; +import com.datastax.driver.core.TokenRange; +import org.apache.cassandra.stress.Operation; +import org.apache.cassandra.stress.StressYaml; +import org.apache.cassandra.stress.WorkManager; +import org.apache.cassandra.stress.generate.TokenRangeIterator; +import org.apache.cassandra.stress.settings.StressSettings; +import org.apache.cassandra.stress.util.JavaDriverClient; +import org.apache.cassandra.stress.util.ThriftClient; +import org.apache.cassandra.stress.util.Timer; + +public class TokenRangeQuery extends Operation +{ + private final ThreadLocal currentState = new ThreadLocal<>(); + + private final TableMetadata tableMetadata; + private final TokenRangeIterator tokenRangeIterator; + private final String columns; + private final int pageSize; + private final boolean isWarmup; + + public TokenRangeQuery(Timer timer, + StressSettings settings, + TableMetadata tableMetadata, + TokenRangeIterator tokenRangeIterator, + StressYaml.TokenRangeQueryDef def, + boolean isWarmup) + { + super(timer, settings); + this.tableMetadata = tableMetadata; + this.tokenRangeIterator = tokenRangeIterator; + this.columns = sanitizeColumns(def.columns, tableMetadata); + this.pageSize = isWarmup ? Math.min(100, def.page_size) : def.page_size; + this.isWarmup = isWarmup; + } + + /** + * We need to specify the columns by name because we need to add token(partition_keys) in order to count + * partitions. So if the user specifies '*' then replace it with a list of all columns. + */ + private static String sanitizeColumns(String columns, TableMetadata tableMetadata) + { + if (!columns.equals("*")) + return columns; + + return String.join(", ", tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList())); + } + + /** + * The state of a token range currently being retrieved. + * Here we store the paging state to retrieve more pages + * and we keep track of which partitions have already been retrieved, + */ + private final static class State + { + public final TokenRange tokenRange; + public final String query; + public PagingState pagingState; + public Set partitions = new HashSet<>(); + + public State(TokenRange tokenRange, String query) + { + this.tokenRange = tokenRange; + this.query = query; + } + + @Override + public String toString() + { + return String.format("[%s, %s]", tokenRange.getStart(), tokenRange.getEnd()); + } + } + + abstract static class Runner implements RunOp + { + int partitionCount; + int rowCount; + + @Override + public int partitionCount() + { + return partitionCount; + } + + @Override + public int rowCount() + { + return rowCount; + } + } + + private class JavaDriverRun extends Runner + { + final JavaDriverClient client; + + private JavaDriverRun(JavaDriverClient client) + { + this.client = client; + } + + public boolean run() throws Exception + { + State state = currentState.get(); + if (state == null) + { // start processing a new token range + TokenRange range = tokenRangeIterator.next(); + if (range == null) + return true; // no more token ranges to process + + state = new State(range, buildQuery(range)); + currentState.set(state); + } + + ResultSet results; + Statement statement = new SimpleStatement(state.query); + statement.setFetchSize(pageSize); + + if (state.pagingState != null) + statement.setPagingState(state.pagingState); + + results = client.getSession().execute(statement); + state.pagingState = results.getExecutionInfo().getPagingState(); + + int remaining = results.getAvailableWithoutFetching(); + rowCount += remaining; + + for (Row row : results) + { + // this call will only succeed if we've added token(partition keys) to the query + Token partition = row.getPartitionKeyToken(); + if (!state.partitions.contains(partition)) + { + partitionCount += 1; + state.partitions.add(partition); + } + + if (--remaining == 0) + break; + } + + if (results.isExhausted() || isWarmup) + { // no more pages to fetch or just warming up, ready to move on to another token range + currentState.set(null); + } + + return true; + } + } + + private String buildQuery(TokenRange tokenRange) + { + Token start = tokenRange.getStart(); + Token end = tokenRange.getEnd(); + List pkColumns = tableMetadata.getPartitionKey().stream().map(ColumnMetadata::getName).collect(Collectors.toList()); + String tokenStatement = String.format("token(%s)", String.join(", ", pkColumns)); + + StringBuilder ret = new StringBuilder(); + ret.append("SELECT "); + ret.append(tokenStatement); // add the token(pk) statement so that we can count partitions + ret.append(", "); + ret.append(columns); + ret.append(" FROM "); + ret.append(tableMetadata.getName()); + if (start != null || end != null) + ret.append(" WHERE "); + if (start != null) + { + ret.append(tokenStatement); + ret.append(" > "); + ret.append(start.toString()); + } + + if (start != null && end != null) + ret.append(" AND "); + + if (end != null) + { + ret.append(tokenStatement); + ret.append(" <= "); + ret.append(end.toString()); + } + + return ret.toString(); + } + + private static class ThriftRun extends Runner + { + final ThriftClient client; + + private ThriftRun(ThriftClient client) + { + this.client = client; + } + + public boolean run() throws Exception + { + throw new OperationNotSupportedException("Bulk read over thrift not supported"); + } + } + + + @Override + public void run(JavaDriverClient client) throws IOException + { + timeWithRetry(new JavaDriverRun(client)); + } + + @Override + public void run(ThriftClient client) throws IOException + { + timeWithRetry(new ThriftRun(client)); + } + + public boolean ready(WorkManager workManager, RateLimiter rateLimiter) + { + tokenRangeIterator.update(); + + if (tokenRangeIterator.exhausted() && currentState.get() == null) + return false; + + int numLeft = workManager.takePermits(1); + if (rateLimiter != null && numLeft > 0 ) + rateLimiter.acquire(numLeft); + + return numLeft > 0; + } + + public String key() + { + State state = currentState.get(); + return state == null ? "-" : state.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java index c07328a..33f6f80 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.generate.Row; +import org.apache.cassandra.stress.operations.PartitionOperation; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; @@ -44,7 +45,7 @@ import org.apache.cassandra.thrift.ThriftConversion; import org.apache.cassandra.utils.Pair; import org.apache.thrift.TException; -public class ValidatingSchemaQuery extends Operation +public class ValidatingSchemaQuery extends PartitionOperation { private Pair bounds; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java index 30933d9..6d8e184 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/CliOption.java @@ -39,7 +39,8 @@ public enum CliOption TRANSPORT("Custom transport factories", SettingsTransport.helpPrinter()), PORT("The port to connect to cassandra nodes on", SettingsPort.helpPrinter()), SENDTO("-send-to", "Specify a stress server to send this command to", SettingsMisc.sendToDaemonHelpPrinter()), - GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter()) + GRAPH("-graph", "Graph recorded metrics", SettingsGraph.helpPrinter()), + TOKENRANGE("Token range settings", SettingsTokenRange.helpPrinter()) ; private static final Map LOOKUP; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java index 83f444c..c2f2591 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java @@ -51,7 +51,7 @@ public class SettingsCommandPreDefined extends SettingsCommand final SeedManager seeds = new SeedManager(settings); return new OpDistributionFactory() { - public OpDistribution get(Timing timing, int sampleCount) + public OpDistribution get(Timing timing, int sampleCount, boolean isWarmup) { return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount), newGenerator(settings), seeds, settings, add)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java index 861b1a4..dd11452 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefinedMixed.java @@ -55,7 +55,7 @@ public class SettingsCommandPreDefinedMixed extends SettingsCommandPreDefined final SeedManager seeds = new SeedManager(settings); return new SampledOpDistributionFactory(ratios, clustering) { - protected List get(Timer timer, PartitionGenerator generator, Command key) + protected List get(Timer timer, PartitionGenerator generator, Command key, boolean isWarmup) { return Collections.singletonList(PredefinedOperation.operation(key, timer, generator, seeds, settings, add)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java index 8440e8e..36cbefe 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java @@ -33,6 +33,7 @@ import org.apache.cassandra.stress.StressProfile; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.stress.generate.PartitionGenerator; import org.apache.cassandra.stress.generate.SeedManager; +import org.apache.cassandra.stress.generate.TokenRangeIterator; import org.apache.cassandra.stress.operations.OpDistributionFactory; import org.apache.cassandra.stress.operations.SampledOpDistributionFactory; import org.apache.cassandra.stress.util.Timer; @@ -69,15 +70,24 @@ public class SettingsCommandUser extends SettingsCommand public OpDistributionFactory getFactory(final StressSettings settings) { final SeedManager seeds = new SeedManager(settings); + final TokenRangeIterator tokenRangeIterator = profile.tokenRangeQueries.isEmpty() + ? null + : new TokenRangeIterator(settings, + profile.maybeLoadTokenRanges(settings)); + return new SampledOpDistributionFactory(ratios, clustering) { - protected List get(Timer timer, PartitionGenerator generator, String key) + protected List get(Timer timer, PartitionGenerator generator, String key, boolean isWarmup) { if (key.equalsIgnoreCase("insert")) return Collections.singletonList(profile.getInsert(timer, generator, seeds, settings)); if (key.equalsIgnoreCase("validate")) return profile.getValidate(timer, generator, seeds, settings); - return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings)); + + if (profile.tokenRangeQueries.containsKey(key)) + return Collections.singletonList(profile.getBulkReadQueries(key, timer, settings, tokenRangeIterator, isWarmup)); + + return Collections.singletonList(profile.getQuery(key, timer, generator, seeds, settings, isWarmup)); } protected PartitionGenerator newGenerator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f27ab290/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java new file mode 100644 index 0000000..8fb0048 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTokenRange.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.stress.settings; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; + +public class SettingsTokenRange +{ + public final boolean wrap; + public final int splitFactor; + + public SettingsTokenRange(TokenRangeOptions options) + { + this.wrap = options.wrap.setByUser(); + this.splitFactor = Ints.checkedCast(OptionDistribution.parseLong(options.splitFactor.value())); + } + + private static final class TokenRangeOptions extends GroupedOptions + { + final OptionSimple wrap = new OptionSimple("wrap", "", null, "Re-use token ranges in order to terminate stress iterations", false); + final OptionSimple splitFactor = new OptionSimple("split-factor=", "[0-9]+[bmk]?", "1", "Split every token range by this factor", false); + + + @Override + public List options() + { + return ImmutableList.