cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Menegatti <gabr...@simbioseventures.com>
Subject Problems with unbalanced write requests using TokenAware policy
Date Tue, 11 Nov 2014 19:58:00 GMT
Hello,

We are facing a "difficult" problem when trying to use the TokenAware
policy when performing writes, as our write request are always unbalanced
due some unknown reason. Maybe we have something wrong on our data model?!?

We are trying to use Token Aware to send the writes, because as far as we
understand, the objective of the Token Aware policy is to send the write
request directly to the node that owns the data according to the
token/routing_key, so the request don't need to be received by a
coordinator and then re-routed to the right node. Is our understanding
right?

We have a cluster with 2 DCs, where DC1 has 30 servers and DC2 10 servers
(but same storage capacity of DC1). We are using the DSE Community 2.1.1
and the DataStax Python Driver 2.1.2.

Our configurations are:
- Murmur3Partitioner
- vnodes (num_tokens: 256)
- NetworkTopologyStrategy

The key space is called "identificationmr" and the replication
factor/strategy used is 1 for DC1 and 2 for DC2.

We are performing all the writes using consistency level ONE (not
LOCAL_ONE), using the DC1 nodes as seeds.

If we take a look (image below) on the data distribution, everything seems
to be working well, everything is balanced, see image below:

[image: Inline image 1]



*So the problem is:*
We tried writing data using many load balancing policy to see what is going
on, but so far, no conclusions and all the write requests remains
unbalanced.

1) When we try to write data to the column family "user_data" or
"user_data_idx", using the token aware policy, for some reason one specific
node receives most of the write requests (maybe because its somehow acting
as coordinator, even if it should not), see image below:

[image: Inline image 3]


2) If we try to make the writes using DC Aware Round Robin policy,
everything is "equally distributed", but few minutes latter the
cluster/nodes starts to give timeout.. probably because all the nodes are
acting as coordinators and routing all the write requests to other nodes
all the time. Since our replication factor is 1 and we have 30 servers, for
every 100 write requests we send to a node, around 3 of them remains on the
node that received the request and the rest need to be re-routed.


*Our schema:*
*- identificationmr.user_data:*

CREATE TABLE identificationmr.user_data (
    i_name text,
    i_value text,
    type int,
    creation_key bigint,
    data text,
    identifiers text,
    on_boarded boolean,
    origin text,
    prefix text,
    sub_prefix text,
    PRIMARY KEY ((i_name, i_value), type, creation_key, data)
) WITH CLUSTERING ORDER BY (type ASC, creation_key ASC, data ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class':
'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'max_threshold': '32'}
    AND compression = {'sstable_compression':
'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';


*- identificationmr.user_data_idx:*

CREATE TABLE identificationmr.user_data_idx (
    creation_key bigint,
    page_number int,
    i_name text,
    i_value text,
    PRIMARY KEY ((creation_key, page_number), i_name, i_value)
) WITH CLUSTERING ORDER BY (i_name ASC, i_value ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class':
'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'max_threshold': '32'}
    AND compression = {'sstable_compression':
'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';


*Python code used to perform the write:*
        self.INSERT_USER_DATA = "INSERT INTO " + self.USER_DATA + "
(i_name, i_value, creation_key, type, data, prefix, sub_prefix, origin,
identifiers, on_boarded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"
        self.INSERT_USER_QUEUE_REDUCER = "INSERT INTO " +
self.USER_QUEUE_REDUCER + "{jn} (i_name, i_value, start_key, end_key)
VALUES (?, ?, ?, ?) ;"
        self.INSERT_USER_QUEUE_INDEXER = "INSERT INTO " +
self.USER_QUEUE_INDEXER + "{jn} (i_name, i_value, start_key, end_key)
VALUES (?, ?, ?, ?) ;"
        self.INSERT_USER_DATA_IDX = "INSERT INTO " + self.USER_DATA_IDX + "
(creation_key, page_number, i_name, i_value) VALUES (?, ?, ?, ?) ;"

        keyspace = cfg.identification_keyspace

        self.default_consistency_level = ConsistencyLevel.ONE
        global cluster, session
        if not cluster or (session and session.is_shutdown):
            logging.info("Connecting to cluster %s/%s" %
(cfg.identification_hosts, cfg.identification_keyspace))
            if "localhost" in cfg.identification_hosts:
                load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy())
                self.default_consistency_level = ConsistencyLevel.QUORUM
                logging.info("Using load_balancing_policy =
TokenAwarePolicy(RoundRobinPolicy())")
            else:
                self.default_consistency_level = ConsistencyLevel.ONE
                load_balancing_policy =
TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))
                logging.info("Using load_balancing_policy =
TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))")
                # load_balancing_policy = DCAwareRoundRobinPolicy('DC1')
                # logging.info("Using load_balancing_policy =
DCAwareRoundRobinPolicy('DC1')")
            cluster = get_cluster(json.loads(cfg.identification_hosts),
                              port=int(cfg.identification_port),
                              control_connection_timeout=100000,

reconnection_policy=Cluster.reconnection_policy,
                              load_balancing_policy=load_balancing_policy
            )
        if not session or session.is_shutdown:
            logging.info("Connecting to keyspace %s/%s" %
(cfg.identification_hosts, cfg.identification_keyspace))
            session = cluster.connect(keyspace)
        self.session = session

        self.session.default_consistency_level =
self.default_consistency_level
        self.session.default_timeout = 100000
        self.write_on_queues = write_on_queues
        self.max_batch_size = max_batch_size

        self.PREPARED_INSERT_USER_DATA =
self.session.prepare(self.INSERT_USER_DATA)
        self.PREPARED_INSERT_USER_DATA_IDX =
self.session.prepare(self.INSERT_USER_DATA_IDX)


*Please, on this link below its possible to find the results for:*
- nodetool info
- nodetool cfstats identificationmr.user_data
- nodetool cfstats identificationmr.user_data_idx
- nodetool proxyhistograms
- nodetool cfhistograms identificationmr.user_data
- nodetool cfhistograms identificationmr.user_data_idx

Link:
https://www.dropbox.com/sh/x1xb53aomzit0ov/AAAVOOAhyDFfpA3zr8AtOCTWa?dl=0
<https://www.google.com/url?q=https%3A%2F%2Fwww.dropbox.com%2Fsh%2Fx1xb53aomzit0ov%2FAAAVOOAhyDFfpA3zr8AtOCTWa%3Fdl%3D0&sa=D&sntz=1&usg=AFQjCNH1IlN-9rrVqZHFdcvbiyiFpfpmdA>


Please, any help or hint?

So sorry for the long/detailed email and thanks in advance.

Regards,
Gabriel.

Mime
View raw message