cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Menegatti <gabr...@simbioseventures.com>
Subject Re: Problems with unbalanced write requests using TokenAware policy
Date Wed, 12 Nov 2014 01:09:12 GMT
Hi Karl,

Thanks so much for your reply.

But considering that we just have 1 replica (RF 1) on DC1, this would
probably not affect our case as we just have 1 healthy replica, right?

Regards,
Gabriel.

On Tue, Nov 11, 2014 at 10:09 PM, Karl Rieb <karl.rieb@gmail.com> wrote:

> I know you are using the python-driver, but the Java driver recently fixed
> an issue with this:
>
>   https://datastax-oss.atlassian.net/browse/JAVA-504
>
> Looking at the python-driver source code, the python-driver appears to
> have the same bug where it always returns the first healthy replica:
>
>
> https://github.com/datastax/python-driver/blob/422f2d53f90d8817af11e3043e16a542352b63c4/cassandra/policies.py#L366
>
> The Java driver now shuffles the replicas before returning them, so as to
> distribute the load better.  You may want to open up an issue with the
> python-driver or at least send a message to their mailing list.
>
> -Karl
>
>
>
> On Tue, Nov 11, 2014 at 3:14 PM, Gabriel Menegatti <
> gabriel@s1mbi0se.com.br> wrote:
>
>> Hello,
>>
>> We are facing a "difficult" problem when trying to use the TokenAware
>> policy when performing writes, as our write request are always unbalanced
>> due some unknown reason. Maybe we have something wrong on our data model?!?
>>
>> We are trying to use Token Aware to send the writes, because as far as we
>> understand, the objective of the Token Aware policy is to send the write
>> request directly to the node that owns the data according to the
>> token/routing_key, so the request don't need to be received by a
>> coordinator and then re-routed to the right node. Is our understanding
>> right?
>>
>> We have a cluster with 2 DCs, where DC1 has 30 servers and DC2 10 servers
>> (but same storage capacity of DC1). We are using the DSE Community 2.1.1
>> and the DataStax Python Driver 2.1.2.
>>
>> Our configurations are:
>> - Murmur3Partitioner
>> - vnodes (num_tokens: 256)
>> - NetworkTopologyStrategy
>>
>> The key space is called "identificationmr" and the replication
>> factor/strategy used is 1 for DC1 and 2 for DC2.
>>
>> We are performing all the writes using consistency level ONE (not
>> LOCAL_ONE), using the DC1 nodes as seeds.
>>
>> If we take a look (image below) on the data distribution, everything
>> seems to be working well, everything is balanced, see image below:
>>
>> [image: Inline image 1]
>>
>>
>>
>> *So the problem is:*
>> We tried writing data using many load balancing policy to see what is
>> going on, but so far, no conclusions and all the write requests remains
>> unbalanced.
>>
>> 1) When we try to write data to the column family "user_data" or
>> "user_data_idx", using the token aware policy, for some reason one specific
>> node receives most of the write requests (maybe because its somehow acting
>> as coordinator, even if it should not), see image below:
>>
>> [image: Inline image 3]
>>
>>
>> 2) If we try to make the writes using DC Aware Round Robin policy,
>> everything is "equally distributed", but few minutes latter the
>> cluster/nodes starts to give timeout.. probably because all the nodes are
>> acting as coordinators and routing all the write requests to other nodes
>> all the time. Since our replication factor is 1 and we have 30 servers, for
>> every 100 write requests we send to a node, around 3 of them remains on the
>> node that received the request and the rest need to be re-routed.
>>
>>
>> *Our schema:*
>> *- identificationmr.user_data:*
>>
>> CREATE TABLE identificationmr.user_data (
>>     i_name text,
>>     i_value text,
>>     type int,
>>     creation_key bigint,
>>     data text,
>>     identifiers text,
>>     on_boarded boolean,
>>     origin text,
>>     prefix text,
>>     sub_prefix text,
>>     PRIMARY KEY ((i_name, i_value), type, creation_key, data)
>> ) WITH CLUSTERING ORDER BY (type ASC, creation_key ASC, data ASC)
>>     AND bloom_filter_fp_chance = 0.01
>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>     AND comment = ''
>>     AND compaction = {'min_threshold': '4', 'class':
>> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
>> 'max_threshold': '32'}
>>     AND compression = {'sstable_compression':
>> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>>     AND dclocal_read_repair_chance = 0.1
>>     AND default_time_to_live = 0
>>     AND gc_grace_seconds = 864000
>>     AND max_index_interval = 2048
>>     AND memtable_flush_period_in_ms = 0
>>     AND min_index_interval = 128
>>     AND read_repair_chance = 0.0
>>     AND speculative_retry = '99.0PERCENTILE';
>>
>>
>> *- identificationmr.user_data_idx:*
>>
>> CREATE TABLE identificationmr.user_data_idx (
>>     creation_key bigint,
>>     page_number int,
>>     i_name text,
>>     i_value text,
>>     PRIMARY KEY ((creation_key, page_number), i_name, i_value)
>> ) WITH CLUSTERING ORDER BY (i_name ASC, i_value ASC)
>>     AND bloom_filter_fp_chance = 0.01
>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>     AND comment = ''
>>     AND compaction = {'min_threshold': '4', 'class':
>> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
>> 'max_threshold': '32'}
>>     AND compression = {'sstable_compression':
>> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>>     AND dclocal_read_repair_chance = 0.1
>>     AND default_time_to_live = 0
>>     AND gc_grace_seconds = 864000
>>     AND max_index_interval = 2048
>>     AND memtable_flush_period_in_ms = 0
>>     AND min_index_interval = 128
>>     AND read_repair_chance = 0.0
>>     AND speculative_retry = '99.0PERCENTILE';
>>
>>
>> *Python code used to perform the write:*
>>         self.INSERT_USER_DATA = "INSERT INTO " + self.USER_DATA + "
>> (i_name, i_value, creation_key, type, data, prefix, sub_prefix, origin,
>> identifiers, on_boarded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"
>>         self.INSERT_USER_QUEUE_REDUCER = "INSERT INTO " +
>> self.USER_QUEUE_REDUCER + "{jn} (i_name, i_value, start_key, end_key)
>> VALUES (?, ?, ?, ?) ;"
>>         self.INSERT_USER_QUEUE_INDEXER = "INSERT INTO " +
>> self.USER_QUEUE_INDEXER + "{jn} (i_name, i_value, start_key, end_key)
>> VALUES (?, ?, ?, ?) ;"
>>         self.INSERT_USER_DATA_IDX = "INSERT INTO " + self.USER_DATA_IDX +
>> " (creation_key, page_number, i_name, i_value) VALUES (?, ?, ?, ?) ;"
>>
>>         keyspace = cfg.identification_keyspace
>>
>>         self.default_consistency_level = ConsistencyLevel.ONE
>>         global cluster, session
>>         if not cluster or (session and session.is_shutdown):
>>             logging.info("Connecting to cluster %s/%s" %
>> (cfg.identification_hosts, cfg.identification_keyspace))
>>             if "localhost" in cfg.identification_hosts:
>>                 load_balancing_policy =
>> TokenAwarePolicy(RoundRobinPolicy())
>>                 self.default_consistency_level = ConsistencyLevel.QUORUM
>>                 logging.info("Using load_balancing_policy =
>> TokenAwarePolicy(RoundRobinPolicy())")
>>             else:
>>                 self.default_consistency_level = ConsistencyLevel.ONE
>>                 load_balancing_policy =
>> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))
>>                 logging.info("Using load_balancing_policy =
>> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))")
>>                 # load_balancing_policy = DCAwareRoundRobinPolicy('DC1')
>>                 # logging.info("Using load_balancing_policy =
>> DCAwareRoundRobinPolicy('DC1')")
>>             cluster = get_cluster(json.loads(cfg.identification_hosts),
>>                               port=int(cfg.identification_port),
>>                               control_connection_timeout=100000,
>>
>> reconnection_policy=Cluster.reconnection_policy,
>>                               load_balancing_policy=load_balancing_policy
>>             )
>>         if not session or session.is_shutdown:
>>             logging.info("Connecting to keyspace %s/%s" %
>> (cfg.identification_hosts, cfg.identification_keyspace))
>>             session = cluster.connect(keyspace)
>>         self.session = session
>>
>>         self.session.default_consistency_level =
>> self.default_consistency_level
>>         self.session.default_timeout = 100000
>>         self.write_on_queues = write_on_queues
>>         self.max_batch_size = max_batch_size
>>
>>         self.PREPARED_INSERT_USER_DATA =
>> self.session.prepare(self.INSERT_USER_DATA)
>>         self.PREPARED_INSERT_USER_DATA_IDX =
>> self.session.prepare(self.INSERT_USER_DATA_IDX)
>>
>>
>> *Please, on this link below its possible to find the results for:*
>> - nodetool info
>> - nodetool cfstats identificationmr.user_data
>> - nodetool cfstats identificationmr.user_data_idx
>> - nodetool proxyhistograms
>> - nodetool cfhistograms identificationmr.user_data
>> - nodetool cfhistograms identificationmr.user_data_idx
>>
>> Link:
>> https://www.dropbox.com/sh/x1xb53aomzit0ov/AAAVOOAhyDFfpA3zr8AtOCTWa?dl=0
>> <https://www.google.com/url?q=https%3A%2F%2Fwww.dropbox.com%2Fsh%2Fx1xb53aomzit0ov%2FAAAVOOAhyDFfpA3zr8AtOCTWa%3Fdl%3D0&sa=D&sntz=1&usg=AFQjCNH1IlN-9rrVqZHFdcvbiyiFpfpmdA>
>>
>>
>> Please, any help or hint?
>>
>> So sorry for the long/detailed email and thanks in advance.
>>
>> Regards,
>> Gabriel.
>>
>>
>

Mime
View raw message