cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karl Rieb <karl.r...@gmail.com>
Subject Re: Problems with unbalanced write requests using TokenAware policy
Date Wed, 12 Nov 2014 00:09:42 GMT
I know you are using the python-driver, but the Java driver recently fixed
an issue with this:

  https://datastax-oss.atlassian.net/browse/JAVA-504

Looking at the python-driver source code, the python-driver appears to have
the same bug where it always returns the first healthy replica:


https://github.com/datastax/python-driver/blob/422f2d53f90d8817af11e3043e16a542352b63c4/cassandra/policies.py#L366

The Java driver now shuffles the replicas before returning them, so as to
distribute the load better.  You may want to open up an issue with the
python-driver or at least send a message to their mailing list.

-Karl



On Tue, Nov 11, 2014 at 3:14 PM, Gabriel Menegatti <gabriel@s1mbi0se.com.br>
wrote:

> Hello,
>
> We are facing a "difficult" problem when trying to use the TokenAware
> policy when performing writes, as our write request are always unbalanced
> due some unknown reason. Maybe we have something wrong on our data model?!?
>
> We are trying to use Token Aware to send the writes, because as far as we
> understand, the objective of the Token Aware policy is to send the write
> request directly to the node that owns the data according to the
> token/routing_key, so the request don't need to be received by a
> coordinator and then re-routed to the right node. Is our understanding
> right?
>
> We have a cluster with 2 DCs, where DC1 has 30 servers and DC2 10 servers
> (but same storage capacity of DC1). We are using the DSE Community 2.1.1
> and the DataStax Python Driver 2.1.2.
>
> Our configurations are:
> - Murmur3Partitioner
> - vnodes (num_tokens: 256)
> - NetworkTopologyStrategy
>
> The key space is called "identificationmr" and the replication
> factor/strategy used is 1 for DC1 and 2 for DC2.
>
> We are performing all the writes using consistency level ONE (not
> LOCAL_ONE), using the DC1 nodes as seeds.
>
> If we take a look (image below) on the data distribution, everything seems
> to be working well, everything is balanced, see image below:
>
> [image: Inline image 1]
>
>
>
> *So the problem is:*
> We tried writing data using many load balancing policy to see what is
> going on, but so far, no conclusions and all the write requests remains
> unbalanced.
>
> 1) When we try to write data to the column family "user_data" or
> "user_data_idx", using the token aware policy, for some reason one specific
> node receives most of the write requests (maybe because its somehow acting
> as coordinator, even if it should not), see image below:
>
> [image: Inline image 3]
>
>
> 2) If we try to make the writes using DC Aware Round Robin policy,
> everything is "equally distributed", but few minutes latter the
> cluster/nodes starts to give timeout.. probably because all the nodes are
> acting as coordinators and routing all the write requests to other nodes
> all the time. Since our replication factor is 1 and we have 30 servers, for
> every 100 write requests we send to a node, around 3 of them remains on the
> node that received the request and the rest need to be re-routed.
>
>
> *Our schema:*
> *- identificationmr.user_data:*
>
> CREATE TABLE identificationmr.user_data (
>     i_name text,
>     i_value text,
>     type int,
>     creation_key bigint,
>     data text,
>     identifiers text,
>     on_boarded boolean,
>     origin text,
>     prefix text,
>     sub_prefix text,
>     PRIMARY KEY ((i_name, i_value), type, creation_key, data)
> ) WITH CLUSTERING ORDER BY (type ASC, creation_key ASC, data ASC)
>     AND bloom_filter_fp_chance = 0.01
>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>     AND comment = ''
>     AND compaction = {'min_threshold': '4', 'class':
> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
> 'max_threshold': '32'}
>     AND compression = {'sstable_compression':
> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>     AND dclocal_read_repair_chance = 0.1
>     AND default_time_to_live = 0
>     AND gc_grace_seconds = 864000
>     AND max_index_interval = 2048
>     AND memtable_flush_period_in_ms = 0
>     AND min_index_interval = 128
>     AND read_repair_chance = 0.0
>     AND speculative_retry = '99.0PERCENTILE';
>
>
> *- identificationmr.user_data_idx:*
>
> CREATE TABLE identificationmr.user_data_idx (
>     creation_key bigint,
>     page_number int,
>     i_name text,
>     i_value text,
>     PRIMARY KEY ((creation_key, page_number), i_name, i_value)
> ) WITH CLUSTERING ORDER BY (i_name ASC, i_value ASC)
>     AND bloom_filter_fp_chance = 0.01
>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>     AND comment = ''
>     AND compaction = {'min_threshold': '4', 'class':
> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
> 'max_threshold': '32'}
>     AND compression = {'sstable_compression':
> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>     AND dclocal_read_repair_chance = 0.1
>     AND default_time_to_live = 0
>     AND gc_grace_seconds = 864000
>     AND max_index_interval = 2048
>     AND memtable_flush_period_in_ms = 0
>     AND min_index_interval = 128
>     AND read_repair_chance = 0.0
>     AND speculative_retry = '99.0PERCENTILE';
>
>
> *Python code used to perform the write:*
>         self.INSERT_USER_DATA = "INSERT INTO " + self.USER_DATA + "
> (i_name, i_value, creation_key, type, data, prefix, sub_prefix, origin,
> identifiers, on_boarded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"
>         self.INSERT_USER_QUEUE_REDUCER = "INSERT INTO " +
> self.USER_QUEUE_REDUCER + "{jn} (i_name, i_value, start_key, end_key)
> VALUES (?, ?, ?, ?) ;"
>         self.INSERT_USER_QUEUE_INDEXER = "INSERT INTO " +
> self.USER_QUEUE_INDEXER + "{jn} (i_name, i_value, start_key, end_key)
> VALUES (?, ?, ?, ?) ;"
>         self.INSERT_USER_DATA_IDX = "INSERT INTO " + self.USER_DATA_IDX +
> " (creation_key, page_number, i_name, i_value) VALUES (?, ?, ?, ?) ;"
>
>         keyspace = cfg.identification_keyspace
>
>         self.default_consistency_level = ConsistencyLevel.ONE
>         global cluster, session
>         if not cluster or (session and session.is_shutdown):
>             logging.info("Connecting to cluster %s/%s" %
> (cfg.identification_hosts, cfg.identification_keyspace))
>             if "localhost" in cfg.identification_hosts:
>                 load_balancing_policy =
> TokenAwarePolicy(RoundRobinPolicy())
>                 self.default_consistency_level = ConsistencyLevel.QUORUM
>                 logging.info("Using load_balancing_policy =
> TokenAwarePolicy(RoundRobinPolicy())")
>             else:
>                 self.default_consistency_level = ConsistencyLevel.ONE
>                 load_balancing_policy =
> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))
>                 logging.info("Using load_balancing_policy =
> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))")
>                 # load_balancing_policy = DCAwareRoundRobinPolicy('DC1')
>                 # logging.info("Using load_balancing_policy =
> DCAwareRoundRobinPolicy('DC1')")
>             cluster = get_cluster(json.loads(cfg.identification_hosts),
>                               port=int(cfg.identification_port),
>                               control_connection_timeout=100000,
>
> reconnection_policy=Cluster.reconnection_policy,
>                               load_balancing_policy=load_balancing_policy
>             )
>         if not session or session.is_shutdown:
>             logging.info("Connecting to keyspace %s/%s" %
> (cfg.identification_hosts, cfg.identification_keyspace))
>             session = cluster.connect(keyspace)
>         self.session = session
>
>         self.session.default_consistency_level =
> self.default_consistency_level
>         self.session.default_timeout = 100000
>         self.write_on_queues = write_on_queues
>         self.max_batch_size = max_batch_size
>
>         self.PREPARED_INSERT_USER_DATA =
> self.session.prepare(self.INSERT_USER_DATA)
>         self.PREPARED_INSERT_USER_DATA_IDX =
> self.session.prepare(self.INSERT_USER_DATA_IDX)
>
>
> *Please, on this link below its possible to find the results for:*
> - nodetool info
> - nodetool cfstats identificationmr.user_data
> - nodetool cfstats identificationmr.user_data_idx
> - nodetool proxyhistograms
> - nodetool cfhistograms identificationmr.user_data
> - nodetool cfhistograms identificationmr.user_data_idx
>
> Link:
> https://www.dropbox.com/sh/x1xb53aomzit0ov/AAAVOOAhyDFfpA3zr8AtOCTWa?dl=0
> <https://www.google.com/url?q=https%3A%2F%2Fwww.dropbox.com%2Fsh%2Fx1xb53aomzit0ov%2FAAAVOOAhyDFfpA3zr8AtOCTWa%3Fdl%3D0&sa=D&sntz=1&usg=AFQjCNH1IlN-9rrVqZHFdcvbiyiFpfpmdA>
>
>
> Please, any help or hint?
>
> So sorry for the long/detailed email and thanks in advance.
>
> Regards,
> Gabriel.
>
>

Mime
View raw message