cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r1002245 - in /cassandra/trunk/contrib/py_stress: README.txt stress.py
Date Tue, 28 Sep 2010 16:31:10 GMT
Author: brandonwilliams
Date: Tue Sep 28 16:31:10 2010
New Revision: 1002245

URL: http://svn.apache.org/viewvc?rev=1002245&view=rev
Log:
Add secondary index ops to stress.py.  Patch by Stu Hood, reviewed by brandonwilliams for
CASSANDRA-1531

Modified:
    cassandra/trunk/contrib/py_stress/README.txt
    cassandra/trunk/contrib/py_stress/stress.py

Modified: cassandra/trunk/contrib/py_stress/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/README.txt?rev=1002245&r1=1002244&r2=1002245&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/README.txt (original)
+++ cassandra/trunk/contrib/py_stress/README.txt Tue Sep 28 16:31:10 2010
@@ -33,10 +33,11 @@ There are three different modes of opera
     * inserting (loading test data)
     * reading
     * range slicing (only works with the OrderPreservingPartioner)
+    * indexed range slicing (works with RandomParitioner on indexed ColumnFamilies)
 
 Important options:
     -o or --operation
-        Sets the operation mode, one of 'insert', 'read', or 'rangeslice'
+        Sets the operation mode, one of 'insert', 'read', 'rangeslice', or 'indexedrangeslice'
     -n or --num-keys:
         the number of rows to insert/read/slice 
     -d or --nodes:

Modified: cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1002245&r1=1002244&r2=1002245&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/stress.py (original)
+++ cassandra/trunk/contrib/py_stress/stress.py Tue Sep 28 16:31:10 2010
@@ -65,6 +65,8 @@ parser.add_option('-t', '--threads', typ
                   help="Number of threads/procs to use", default=50)
 parser.add_option('-c', '--columns', type="int", dest="columns",
                   help="Number of columns per key", default=5)
+parser.add_option('-C', '--cardinality', type="int", dest="cardinality",
+                  help="Number of unique values stored in columns", default=50)
 parser.add_option('-d', '--nodes', type="string", dest="nodes",
                   help="Host nodes (comma separated)", default="localhost")
 parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1,
@@ -79,7 +81,7 @@ parser.add_option('-m', '--unframed', ac
                   help="use unframed transport")
 parser.add_option('-o', '--operation', type="choice", dest="operation",
                   default="insert", choices=('insert', 'read', 'rangeslice',
-                  'multiget'),
+                  'indexedrangeslice', 'multiget'),
                   help="operation to perform")
 parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
                   help="number of super columns per key")
@@ -98,6 +100,9 @@ parser.add_option('-l', '--replication-f
                   help="replication factor to use when creating needed column families")
 parser.add_option('-e', '--consistency-level', type="str", default='ONE',
                   dest="consistency", help="consistency level to use")
+parser.add_option('-x', '--create-index', type="choice",
+                  choices=('keys','keys_bitmap', 'none'), default='none',
+                  dest="index", help="type of index to create on needed column families")
 
 (options, args) = parser.parse_args()
  
@@ -122,6 +127,13 @@ if consistency is None:
     print "%s is not a valid consistency level" % options.consistency
     sys.exit(3)
 
+# generates a list of unique, deterministic values
+def generate_values():
+    values = []
+    for i in xrange(0, options.cardinality):
+        values.append('%d-%s' % (i, md5(str(i)).hexdigest()))
+    return values
+
 def key_generator_gauss():
     fmt = '%0' + str(len(str(total_keys))) + 'd'
     while True:
@@ -152,7 +164,12 @@ def get_client(host='127.0.0.1', port=91
     return client
 
 def make_keyspaces():
-    cfams = [CfDef(keyspace='Keyspace1', name='Standard1'),
+    colms = []
+    if options.index == 'keys':
+        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS)]
+    elif options.index == 'keys_bitmap':
+        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP)]
+    cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms),
              CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')]
     keyspace = KsDef(name='Keyspace1', strategy_class='org.apache.cassandra.locator.SimpleStrategy',
replication_factor=options.replication, cf_defs=cfams)
     client = get_client(nodes[0], options.port)
@@ -187,8 +204,8 @@ class Operation(Thread):
 
 class Inserter(Operation):
     def run(self):
-        data = md5(str(get_ident())).hexdigest()
-        columns = [Column('C' + str(j), data, time.time() * 1000000) for j in xrange(columns_per_key)]
+        values = generate_values()
+        columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
         fmt = '%0' + str(len(str(total_keys))) + 'd'
         if 'super' == options.cftype:
             supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
@@ -198,6 +215,10 @@ class Inserter(Operation):
                 cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for
s in supers]}}
             else:
                 cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for
c in columns]}}
+            # set the correct column values for this row
+            value = values[i % len(values)]
+            for column in columns:
+                column.value = value
             start = time.time()
             try:
                 self.cclient.batch_mutate(cfmap, consistency)
@@ -308,6 +329,45 @@ class RangeSlicer(Operation):
                 self.opcounts[self.idx] += 1
                 self.keycounts[self.idx] += len(r)
 
+# Each thread queries for a portion of the unique values
+# TODO: all threads start at the same key: implement wrapping, and start
+# from the thread's appointed range
+class IndexedRangeSlicer(Operation):
+    def run(self):
+        fmt = '%0' + str(len(str(total_keys))) + 'd'
+        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
+        values = generate_values()
+        parent = ColumnParent('Standard1')
+        # the number of rows with a particular value and the number of values we should query
for
+        expected_per_value = total_keys // len(values)
+        valuebegin = self.range[0] // expected_per_value
+        valuecount = len(self.range) // expected_per_value
+        for valueidx in xrange(valuebegin, valuebegin + valuecount):
+            received = 0
+            start = fmt % 0
+            value = values[valueidx % len(values)]
+            expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value)]
+            while received < expected_per_value:
+                clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions)
+                begin = time.time()
+                try:
+                    r = self.cclient.get_indexed_slices(parent, clause, p, consistency)
+                    if not r: raise RuntimeError("No indexed values from offset received:",
start)
+                except KeyboardInterrupt:
+                    raise
+                except Exception, e:
+                    if options.ignore:
+                        print e
+                        continue
+                    else:
+                        raise
+                received += len(r)
+                # convert max key found back to an integer, and increment it
+                start = fmt % (1 + max([int(keyslice.key) for keyslice in r]))
+                self.latencies[self.idx] += time.time() - begin
+                self.opcounts[self.idx] += 1
+                self.keycounts[self.idx] += len(r)
+
 
 class MultiGetter(Operation):
     def run(self):
@@ -364,6 +424,8 @@ class OperationFactory:
             return Inserter(i, opcounts, keycounts, latencies)
         elif type == 'rangeslice':
             return RangeSlicer(i, opcounts, keycounts, latencies)
+        elif type == 'indexedrangeslice':
+            return IndexedRangeSlicer(i, opcounts, keycounts, latencies)
         elif type == 'multiget':
             return MultiGetter(i, opcounts, keycounts, latencies)
         else:
@@ -427,6 +489,10 @@ class Stress(object):
         threads = self.create_threads('rangeslice')
         self.run_test(options.file,threads);
 
+    def indexedrangeslice(self):
+        threads = self.create_threads('indexedrangeslice')
+        self.run_test(options.file,threads);
+
     def multiget(self):
         threads = self.create_threads('multiget')
         self.run_test(options.file,threads);



Mime
View raw message