cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Adam Holmberg (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CASSANDRA-9302) Optimize cqlsh COPY FROM, part 3
Date Tue, 08 Dec 2015 17:24:11 GMT

    [ https://issues.apache.org/jira/browse/CASSANDRA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047094#comment-15047094
] 

Adam Holmberg commented on CASSANDRA-9302:
------------------------------------------

[~Stefania] my standard disclaimer when reviewing with someone new: I write down almost everything
I note during a review. Some of it can be nit-picky -- feel free to take it or leave it. I
also welcome dialog or feedback about what kind of inputs you want for future reviews

I looked at cassandra-2.2...stef1927:9302-2.2 @ 8c52ae93
---------------------------------------
*bin/cqlsh.py*
COPY_OPTIONS could be composed of common options and to/from options. To/from options used
by name in complete_copy_options.

Since COPY_OPTIONS is always converted to a set for completion, I suggest just making it a
set.

COPY help: move MAXATTEMPTS, REPORTFREQUENCY above TO/FROM groups?

JOBS='6'               - the number of jobs each process can work on at a time
                       ^ alignment problem
It is not clear what "jobs" means in this context, and how the user might want to set this

*copy.ImportReader.read_rows*
Mostly unnecessary to preallocate the rows list, since this loop is dominated by other things.
If you wanted to simplify
and get rid of that loop you could write
{code}
rows = list(next(self.reader) for _ in xrange(self.chunksize))
self.exhausted = len(rows) < chunksize
return rows
{code}

*copy.ImportTask.process_records*
"(batches ore retries)" (typo)

*copy.ImportTask.receive:*
"start_time < 0.1):  # 10 millis" incorrect comment

*copy.ImportTask.receive:*
"except Queue.Empty": what expectation do you have for this queue to be empty if we're testing
"not self.inmsg.empty()" in the loop?

*copy.ImportTask.receive:*
"if err.startswith('ValueError') or err.startswith('TypeError')": may want to add IndexError
so this type of error is not retried:
{code}
cassandra@cqlsh:test> copy t2 (a,b,c,d) FROM 'f.csv';

Starting copy of test.t2 with columns ['a', 'b', 'c', 'd'].
Failed to import 7 rows: IndexError - list index out of range -  will retry later, attempt
1 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry later, attempt
2 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry later, attempt
3 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry later, attempt
4 of 5
Failed to import 7 rows: IndexError - list index out of range -  given up after 5 attempts
Failed to process 7 batches
Processed 0 rows; Written: 0.000000 rows/s
0 rows imported in 0.096 seconds.
{code}

*copy.ImportTask.reset_batch:*
"return dict([(k, v) if k != 'imported' else (k, 0) for k, v in batch.iteritems()])" -->
batch['imported'] = 0 (?)

*copy.ImportTask.batch_id* not used: looks like this is only written, never read
If this is removed, send_batch can accept rows directly and new_batch and make_batch can go
away.

*copy.ImportTask.receive_meter*: seems like overkill since it's ultimately just used to track
the same total as ImportTask.succeeded

*copy.ImportProcess.sessions:*
I see that we're starting a whitelist session per local host. This could lead to quite a bit
of overhead for larger clusters (sessions = nodes * worker processes).
bq. manual replica selection rather than relying on the driver TAR - we get better performance
because we can batch by replica, not just by primary key.
I understand grouping by replica, but I don't understand why driver TAR is not effective here.
To my way of thinking we should be able to have a single session per worker (token-aware,
DC-aware), group rows in a batch message by replica (same way it's done presently), and let
the driver TAR take it from there.

*copy.ImportProcess.get_replica*
Since you're randomizing here, doesn't that decimate some of the split batchability within
each batch? We might be better off randomizing a start index across processes, then round-robin
replicas per batch.

*copy.ImportProcess.execute_statement:**
"callback=partial(self.result_callback, batch)," no need to make partials because the add_callbacks
method accepts callback_args
to be passed through.

*copy.ImportProcess.split_batches:*
"if replica in rows_by_replica:" could eliminate conditional by using rows_by_replica = defaultdict(list)
"if len(remaining_rows) > 0:" --> if remaining_rows
>From the [style guide|https://www.python.org/dev/peps/pep-0008/]:
{quote}
For sequences, (strings, lists, tuples), use the fact that empty sequences are false.

Yes: if not seq:
     if seq:
{quote}
(If you choose to do this there are several call sites to be tweaked in this changeset)

*copy.ImportProcess.get_replica:*
"TODO: DC locality" leftover todo

*General:*
Did you think at all about just constructing simple string queries instead of prepared? Python
can use a lot of cycles doing conversions like this. Right now we have string-->Python
type-->cql serialized. If we built strings from our input (something Python is usually
very efficient doing) it would require less conversions for the bulk of the data (we would
still need to convert PK columns for getting the token). Obviously there is always the alternative
"more workers", but it's something to consider for general efficiency.

> Optimize cqlsh COPY FROM, part 3
> --------------------------------
>
>                 Key: CASSANDRA-9302
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-9302
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Tools
>            Reporter: Jonathan Ellis
>            Assignee: Stefania
>            Priority: Critical
>             Fix For: 2.1.x
>
>
> We've had some discussion moving to Spark CSV import for bulk load in 3.x, but people
need a good bulk load tool now.  One option is to add a separate Java bulk load tool (CASSANDRA-9048),
but if we can match that performance from cqlsh I would prefer to leave COPY FROM as the preferred
option to which we point people, rather than adding more tools that need to be supported indefinitely.
> Previous work on COPY FROM optimization was done in CASSANDRA-7405 and CASSANDRA-8225.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message