Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8F9C610CDD for ; Tue, 8 Mar 2016 09:26:42 +0000 (UTC) Received: (qmail 18599 invoked by uid 500); 8 Mar 2016 09:26:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 17730 invoked by uid 500); 8 Mar 2016 09:26:41 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17306 invoked by uid 99); 8 Mar 2016 09:26:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 09:26:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C71B9E0329; Tue, 8 Mar 2016 09:26:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Tue, 08 Mar 2016 09:26:45 -0000 Message-Id: <6d651ebd4334462e80c719a0e6225e75@git.apache.org> In-Reply-To: <4d7124c8ed3a42bf8dff7fdaaa340e09@git.apache.org> References: <4d7124c8ed3a42bf8dff7fdaaa340e09@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/23] cassandra git commit: Merge commit 'c3d2f26f46c2d37b6cf918cbb5565fe57a5904cc' into cassandra-2.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74ffeaf/pylib/cqlshlib/copyutil.py ---------------------------------------------------------------------- diff --cc pylib/cqlshlib/copyutil.py index aeb2d0b,cd03765..2755dd5 --- a/pylib/cqlshlib/copyutil.py +++ b/pylib/cqlshlib/copyutil.py @@@ -45,9 -51,13 +51,13 @@@ from cassandra.util import Date, Tim from cql3handling import CqlRuleSet from displaying import NO_COLOR_MAP -from formatting import format_value_default, EMPTY, get_formatter +from formatting import format_value_default, DateTimeFormat, EMPTY, get_formatter from sslhandling import ssl_settings + PROFILE_ON = False + STRACE_ON = False + IS_LINUX = platform.system() == 'Linux' + CopyOptions = namedtuple('CopyOptions', 'copy dialect unrecognized') @@@ -164,13 -252,12 +252,13 @@@ class CopyTask(object) # in the page size or 10 seconds if pagesize is smaller copy_options['pagetimeout'] = int(opts.pop('pagetimeout', max(10, 10 * (copy_options['pagesize'] / 1000)))) copy_options['maxattempts'] = int(opts.pop('maxattempts', 5)) - copy_options['dtformats'] = opts.pop('datetimeformat', shell.display_time_format) + copy_options['dtformats'] = DateTimeFormat(opts.pop('datetimeformat', shell.display_timestamp_format), + shell.display_date_format, shell.display_nanotime_format) copy_options['float_precision'] = shell.display_float_precision - copy_options['chunksize'] = int(opts.pop('chunksize', 1000)) + copy_options['chunksize'] = int(opts.pop('chunksize', 5000)) copy_options['ingestrate'] = int(opts.pop('ingestrate', 100000)) copy_options['maxbatchsize'] = int(opts.pop('maxbatchsize', 20)) - copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 2)) + copy_options['minbatchsize'] = int(opts.pop('minbatchsize', 10)) copy_options['reportfrequency'] = float(opts.pop('reportfrequency', 0.25)) copy_options['consistencylevel'] = shell.consistency_level copy_options['decimalsep'] = opts.pop('decimalsep', '.') @@@ -1392,9 -1593,37 +1594,37 @@@ class ImportConversion(object) self.primary_key_indexes = [self.columns.index(col.name) for col in self.table_meta.primary_key] self.partition_key_indexes = [self.columns.index(col.name) for col in self.table_meta.partition_key] + if statement is None: + self.use_prepared_statements = False + statement = self._get_primary_key_statement(parent, table_meta) + else: + self.use_prepared_statements = True + self.proto_version = statement.protocol_version - self.cqltypes = dict([(c.name, c.type) for c in statement.column_metadata]) - self.converters = dict([(c.name, self._get_converter(c.type)) for c in statement.column_metadata]) + + # the cql types and converters for the prepared statement, either the full statement or only the primary keys + self.cqltypes = [c.type for c in statement.column_metadata] + self.converters = [self._get_converter(c.type) for c in statement.column_metadata] + + # the cql types for the entire statement, these are the same as the types above but + # only when using prepared statements - self.coltypes = [table_meta.columns[name].typestring for name in parent.valid_columns] ++ self.coltypes = [table_meta.columns[name].cql_type for name in parent.valid_columns] + # these functions are used for non-prepared statements to protect values with quotes if required + self.protectors = [protect_value if t in ('ascii', 'text', 'timestamp', 'date', 'time', 'inet') else lambda v: v + for t in self.coltypes] + + @staticmethod + def _get_primary_key_statement(parent, table_meta): + """ + We prepare a query statement to find out the types of the partition key columns so we can + route the update query to the correct replicas. As far as I understood this is the easiest + way to find out the types of the partition columns, we will never use this prepared statement + """ + where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key]) + select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(parent.ks), + protect_name(parent.table), + where_clause) + return parent.session.prepare(select_query) def _get_converter(self, cql_type): """ @@@ -1695,67 -2007,88 +2008,88 @@@ class ImportProcess(ChildProcess) self._session.cluster.shutdown() ChildProcess.close(self) - def run_counter(self, table_meta): - """ - Main run method for tables that contain counter columns. - """ - query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table)) - - # We prepare a query statement to find out the types of the partition key columns so we can - # route the update query to the correct replicas. As far as I understood this is the easiest - # way to find out the types of the partition columns, we will never use this prepared statement - where_clause = ' AND '.join(['%s = ?' % (protect_name(c.name)) for c in table_meta.partition_key]) - select_query = 'SELECT * FROM %s.%s WHERE %s' % (protect_name(self.ks), protect_name(self.table), where_clause) - conv = ImportConversion(self, table_meta, self.session.prepare(select_query)) - - while True: - batch = self.inmsg.get() - try: - for b in self.split_batches(batch, conv): - self.send_counter_batch(query, conv, b) + def make_params(self): + metadata = self.session.cluster.metadata + table_meta = metadata.keyspaces[self.ks].tables[self.table] + + prepared_statement = None - is_counter = ("counter" in [table_meta.columns[name].typestring for name in self.valid_columns]) ++ is_counter = ("counter" in [table_meta.columns[name].cql_type for name in self.valid_columns]) + if is_counter: + query = 'UPDATE %s.%s SET %%s WHERE %%s' % (protect_name(self.ks), protect_name(self.table)) + make_statement = self.wrap_make_statement(self.make_counter_batch_statement) + elif self.use_prepared_statements: + query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks), + protect_name(self.table), + ', '.join(protect_names(self.valid_columns),), + ', '.join(['?' for _ in self.valid_columns])) + + query = self.session.prepare(query) + query.consistency_level = self.consistency_level + prepared_statement = query + make_statement = self.wrap_make_statement(self.make_prepared_batch_statement) + else: + query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % (protect_name(self.ks), + protect_name(self.table), + ', '.join(protect_names(self.valid_columns),)) + make_statement = self.wrap_make_statement(self.make_non_prepared_batch_statement) - except Exception, exc: - self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message))) - if self.debug: - traceback.print_exc(exc) + conv = ImportConversion(self, table_meta, prepared_statement) + tm = TokenMap(self.ks, self.hostname, self.local_dc, self.session) + return query, conv, tm, make_statement - def run_normal(self, table_meta): + def inner_run(self, query, conv, tm, make_statement): """ - Main run method for normal tables, i.e. tables that do not contain counter columns. + Main run method. Note that we bind self methods that are called inside loops + for performance reasons. """ - query = 'INSERT INTO %s.%s (%s) VALUES (%s)' % (protect_name(self.ks), - protect_name(self.table), - ', '.join(protect_names(self.valid_columns),), - ', '.join(['?' for _ in self.valid_columns])) + self.query = query + self.conv = conv + self.make_statement = make_statement - query_statement = self.session.prepare(query) - query_statement.consistency_level = self.consistency_level - conv = ImportConversion(self, table_meta, query_statement) + convert_rows = self.convert_rows + split_into_batches = self.split_into_batches + result_callback = self.result_callback + err_callback = self.err_callback + session = self.session while True: - batch = self.inmsg.get() + chunk = self.inmsg.recv() + if chunk is None: + break + try: - for b in self.split_batches(batch, conv): - self.send_normal_batch(conv, query_statement, b) + chunk['rows'] = convert_rows(conv, chunk) + for replicas, batch in split_into_batches(chunk, conv, tm): + statement = make_statement(query, conv, chunk, batch, replicas) + future = session.execute_async(statement) + future.add_callbacks(callback=result_callback, callback_args=(batch, chunk), + errback=err_callback, errback_args=(batch, chunk, replicas)) except Exception, exc: - self.outmsg.put((batch, '%s - %s' % (exc.__class__.__name__, exc.message))) - if self.debug: - traceback.print_exc(exc) + self.report_error(exc, chunk, chunk['rows']) - def send_counter_batch(self, query_text, conv, batch): - if self.test_failures and self.maybe_inject_failures(batch): - return + def wrap_make_statement(self, inner_make_statement): + def make_statement(query, conv, chunk, batch, replicas): + try: + return inner_make_statement(query, conv, batch, replicas) + except Exception, exc: + print "Failed to make batch statement: {}".format(exc) + self.report_error(exc, chunk, batch['rows']) + return None - error_rows = [] - batch_statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level) + def make_statement_with_failures(query, conv, chunk, batch, replicas): + failed_batch = self.maybe_inject_failures(batch) + if failed_batch: + return failed_batch + return make_statement(query, conv, chunk, batch, replicas) - for r in batch['rows']: - row = self.filter_row_values(r) - if len(row) != len(self.valid_columns): - error_rows.append(row) - continue + return make_statement_with_failures if self.test_failures else make_statement + def make_counter_batch_statement(self, query, conv, batch, replicas): + statement = BatchStatement(batch_type=BatchType.COUNTER, consistency_level=self.consistency_level) + statement.replicas = replicas + statement.keyspace = self.ks + for row in batch['rows']: where_clause = [] set_clause = [] for i, value in enumerate(row):