incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marcelo Elias Del Valle <marc...@s1mbi0se.com.br>
Subject Re: python fast table copy/transform (subject updated)
Date Fri, 06 Jun 2014 18:25:00 GMT
Hi Michael,

I just executed the command:

nohup ./bm_copy.py -w 40  --concurrency 40 -t 900000 &> /tmp/test.out &

Using your new version.

I changed the following part of your script:


        self._stmt = {
            'source': u""" -- these ?'s will be filled in automatically
                SELECT *
                FROM identification.entitylookup
                WHERE TOKEN(name) > ? AND TOKEN(name) <= ?
            """,
            'dest': u"""  -- these ?'s must be mapped by 'map_fields' below
                insert into identification.entity_lookup_test (name, value,
entity_id) values(?, ?, ?)
            """
        }

    def map_fields(self, source_row):
        logger.debug("")

        return (  # return a tuple in the order of the dest ?'s above
            source_row.name,
            source_row.value,
            source_row.entity_id
        )

However, I didn't fell it's fast enough, but probably because of the amount
of CPU of a single Machine.

I had 2 nodes in this cluster when I sent my first message, but now the
cluster have 5 nodes, 250 GB SSD each, 64 Gb RAM.

I am running the script in one of the cluster machines, but althought all
cores show CPU next to 100% (machine config = Intel Xeon E5-1620v2 4c/8t
3,7 GHz+/3,9 GHz+), the cluster doesn't seem to get affected:
[image: Imagem inline 1]

Is there anyway to make your script run in multiple machines at the same
time?  I was analysing your source to understand better how you query the
ranges... I was even thinking in using something similar to process only
the current node's range and run the script on each machine in the
cluster...
Do you know any way of querying the token range(s) of the current physical
nodes?

Also, should it print an "average rows / sec" from time to time? I will try
to change it myself...

And the row_count output is the amount of rows per process or the sum for
all processes? Because sometimes it repeats the number:
INFO       2014-06-06 14:05:17,220 __main__                       6780
update_or_finish                     175 : page: 61; row_count: 61001
INFO       2014-06-06 14:05:17,346 __main__                       6745
update_or_finish                     175 : page: 64; row_count: 64001
INFO       2014-06-06 14:05:17,376 __main__                       6806
update_or_finish                     175 : page: 64; row_count: 64001
I am guessing this happens when there is nothing in a range took by a
task...

I will keep trying here and will try to keep you up to date as well.

Best regards,
Marcelo.


2014-06-06 13:25 GMT-03:00 Laing, Michael <michael.laing@nytimes.com>:

> Hi Marcelo,
>
> I have updated the prerelease app in this gist:
>
> https://gist.github.com/michaelplaing/37d89c8f5f09ae779e47
>
> I found that it was too easy to overrun my Cassandra clusters so I added a
> throttle arg which by default is 1000 rows per second.
>
> Fixed a few bugs too, reworked the args, etc.
>
> I'll be interested to hear if you find it useful and/or have any comments.
>
> ml
>
>
> On Thu, Jun 5, 2014 at 1:09 PM, Marcelo Elias Del Valle <
> marcelo@s1mbi0se.com.br> wrote:
>
>> Michael,
>>
>> I will try to test it up to tomorrow and I will let you know all the
>> results.
>>
>> Thanks a lot!
>>
>> Best regards,
>> Marcelo.
>>
>>
>> 2014-06-04 22:28 GMT-03:00 Laing, Michael <michael.laing@nytimes.com>:
>>
>> BTW you might want to put a LIMIT clause on your SELECT for testing. -ml
>>>
>>>
>>> On Wed, Jun 4, 2014 at 6:04 PM, Laing, Michael <
>>> michael.laing@nytimes.com> wrote:
>>>
>>>> Marcelo,
>>>>
>>>> Here is a link to the preview of the python fast copy program:
>>>>
>>>> https://gist.github.com/michaelplaing/37d89c8f5f09ae779e47
>>>>
>>>> It will copy a table from one cluster to another with some
>>>> transformation- they can be the same cluster.
>>>>
>>>> It has 3 main throttles to experiment with:
>>>>
>>>>    1. fetch_size: size of source pages in rows
>>>>    2. worker_count: number of worker subprocesses
>>>>    3. concurrency: number of async callback chains per worker
>>>>    subprocess
>>>>
>>>> It is easy to overrun Cassandra and the python driver, so I recommend
>>>> starting with the defaults: fetch_size: 1000; worker_count: 2; concurrency:
>>>> 10.
>>>>
>>>> Additionally there are switches to set 'policies' by source and
>>>> destination: retry (downgrade consistency), dc_aware, and token_aware.
>>>> retry is useful if you are getting timeouts. For the others YMMV.
>>>>
>>>> To use it you need to define the SELECT and UPDATE cql statements as
>>>> well as the 'map_fields' method.
>>>>
>>>> The worker subprocesses divide up the token range among themselves and
>>>> proceed quasi-independently. Each worker opens a connection to each cluster
>>>> and the driver sets up connection pools to the nodes in the cluster. Anyway
>>>> there are a lot of processes, threads, callbacks going at once so it is fun
>>>> to watch.
>>>>
>>>> On my regional cluster of small nodes in AWS I got about 3000 rows per
>>>> second transferred after things warmed up a bit - each row about 6kb.
>>>>
>>>> ml
>>>>
>>>>
>>>> On Wed, Jun 4, 2014 at 11:49 AM, Laing, Michael <
>>>> michael.laing@nytimes.com> wrote:
>>>>
>>>>> OK Marcelo, I'll work on it today. -ml
>>>>>
>>>>>
>>>>> On Tue, Jun 3, 2014 at 8:24 PM, Marcelo Elias Del Valle <
>>>>> marcelo@s1mbi0se.com.br> wrote:
>>>>>
>>>>>> Hi Michael,
>>>>>>
>>>>>> For sure I would be interested in this program!
>>>>>>
>>>>>> I am new both to python and for cql. I started creating this copier,
>>>>>> but was having problems with timeouts. Alex solved my problem here
on the
>>>>>> list, but I think I will still have a lot of trouble making the copy
to
>>>>>> work fine.
>>>>>>
>>>>>> I open sourced my version here:
>>>>>> https://github.com/s1mbi0se/cql_record_processor
>>>>>>
>>>>>> Just in case it's useful for anything.
>>>>>>
>>>>>> However, I saw CQL has support for concurrency itself and having
>>>>>> something made by someone who knows Python CQL Driver better would
be very
>>>>>> helpful.
>>>>>>
>>>>>> My two servers today are at OVH (ovh.com), we have servers at AWS
>>>>>> but but several cases we prefer other hosts. Both servers have SDD
and 64
>>>>>> Gb RAM, I could use the script as a benchmark for you if you want.
Besides,
>>>>>> we have some bigger clusters, I could run on the just to test the
speed if
>>>>>> this is going to help.
>>>>>>
>>>>>> Regards
>>>>>> Marcelo.
>>>>>>
>>>>>>
>>>>>> 2014-06-03 11:40 GMT-03:00 Laing, Michael <michael.laing@nytimes.com>
>>>>>> :
>>>>>>
>>>>>> Hi Marcelo,
>>>>>>>
>>>>>>> I could create a fast copy program by repurposing some python
apps
>>>>>>> that I am using for benchmarking the python driver - do you still
need this?
>>>>>>>
>>>>>>> With high levels of concurrency and multiple subprocess workers,
>>>>>>> based on my current actual benchmarks, I think I can get well
over 1,000
>>>>>>> rows/second on my mac and significantly more in AWS. I'm using
variable
>>>>>>> size rows averaging 5kb.
>>>>>>>
>>>>>>> This would be the initial version of a piece of the benchmark
suite
>>>>>>> we will release as part of our nyt⨍aбrik project on 21 June
for my
>>>>>>> Cassandra Day NYC talk re the python driver.
>>>>>>>
>>>>>>> ml
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 2, 2014 at 2:15 PM, Marcelo Elias Del Valle <
>>>>>>> marcelo@s1mbi0se.com.br> wrote:
>>>>>>>
>>>>>>>> Hi Jens,
>>>>>>>>
>>>>>>>> Thanks for trying to help.
>>>>>>>>
>>>>>>>> Indeed, I know I can't do it using just CQL. But what would
you use
>>>>>>>> to migrate data manually? I tried to create a python program
using auto
>>>>>>>> paging, but I am getting timeouts. I also tried Hive, but
no success.
>>>>>>>> I only have two nodes and less than 200Gb in this cluster,
any
>>>>>>>> simple way to extract the data quickly would be good enough
for me.
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Marcelo.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2014-06-02 15:08 GMT-03:00 Jens Rantil <jens.rantil@tink.se>:
>>>>>>>>
>>>>>>>> Hi Marcelo,
>>>>>>>>>
>>>>>>>>> Looks like you can't do this without migrating your data
manually:
>>>>>>>>> https://stackoverflow.com/questions/18421668/alter-cassandra-column-family-primary-key-using-cassandra-cli-or-cql
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jens
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jun 2, 2014 at 7:48 PM, Marcelo Elias Del Valle
<
>>>>>>>>> marcelo@s1mbi0se.com.br> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I have some cql CFs in a 2 node Cassandra 2.0.8 cluster.
>>>>>>>>>>
>>>>>>>>>> I realized I created my column family with the wrong
partition.
>>>>>>>>>> Instead of:
>>>>>>>>>>
>>>>>>>>>> CREATE TABLE IF NOT EXISTS entity_lookup (
>>>>>>>>>>   name varchar,
>>>>>>>>>>   value varchar,
>>>>>>>>>>   entity_id uuid,
>>>>>>>>>>   PRIMARY KEY ((name, value), entity_id))
>>>>>>>>>> WITH
>>>>>>>>>>     caching=all;
>>>>>>>>>>
>>>>>>>>>> I used:
>>>>>>>>>>
>>>>>>>>>> CREATE TABLE IF NOT EXISTS entitylookup (
>>>>>>>>>>   name varchar,
>>>>>>>>>>   value varchar,
>>>>>>>>>>   entity_id uuid,
>>>>>>>>>>   PRIMARY KEY (name, value, entity_id))
>>>>>>>>>> WITH
>>>>>>>>>>     caching=all;
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Now I need to migrate the data from the second CF
to the first
>>>>>>>>>> one.
>>>>>>>>>> I am using Data Stax Community Edition.
>>>>>>>>>>
>>>>>>>>>> What would be the best way to convert data from one
CF to the
>>>>>>>>>> other?
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> Marcelo.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message