Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41523FA38 for ; Fri, 29 Mar 2013 14:22:42 +0000 (UTC) Received: (qmail 37822 invoked by uid 500); 29 Mar 2013 13:54:37 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 37784 invoked by uid 500); 29 Mar 2013 13:54:36 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 37373 invoked by uid 99); 29 Mar 2013 13:54:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Mar 2013 13:54:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 475D1833E20; Fri, 29 Mar 2013 13:54:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brandonwilliams@apache.org To: commits@cassandra.apache.org Date: Fri, 29 Mar 2013 13:54:15 -0000 Message-Id: <4879331522f7418ca4664638533e0cab@git.apache.org> In-Reply-To: <297b9fca1f2a44d190b5d3d73b8bf78d@git.apache.org> References: <297b9fca1f2a44d190b5d3d73b8bf78d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] git commit: Fix fat client. Patch by Carl Yeksigian, reviewed by brandonwilliams for CASSANDRA-5378 Fix fat client. Patch by Carl Yeksigian, reviewed by brandonwilliams for CASSANDRA-5378 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/338fa9a3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/338fa9a3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/338fa9a3 Branch: refs/heads/trunk Commit: 338fa9a30747d18befc0ff66b5cb5265f7bb65f1 Parents: 1ccbcdd Author: Brandon Williams Authored: Fri Mar 29 08:53:24 2013 -0500 Committer: Brandon Williams Committed: Fri Mar 29 08:53:24 2013 -0500 ---------------------------------------------------------------------- examples/client_only/conf/cassandra.yaml | 686 +++++++++------ examples/client_only/src/ClientOnlyExample.java | 193 ++--- .../cassandra/cql3/statements/SelectStatement.java | 10 +- src/java/org/apache/cassandra/gms/Gossiper.java | 12 +- .../cassandra/service/AbstractReadExecutor.java | 14 +- .../apache/cassandra/service/MigrationManager.java | 3 + .../apache/cassandra/service/StorageService.java | 41 +- .../locator/DynamicEndpointSnitchTest.java | 2 +- .../apache/cassandra/service/InitClientTest.java | 2 +- .../service/StorageServiceClientTest.java | 2 +- 10 files changed, 577 insertions(+), 388 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/examples/client_only/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/examples/client_only/conf/cassandra.yaml b/examples/client_only/conf/cassandra.yaml index d9bc0d6..f372dbf 100644 --- a/examples/client_only/conf/cassandra.yaml +++ b/examples/client_only/conf/cassandra.yaml @@ -9,7 +9,22 @@ # one logical cluster from joining another. cluster_name: 'Test Cluster' -# You should always specify InitialToken when setting up a production +# This defines the number of tokens randomly assigned to this node on the ring +# The more tokens, relative to other nodes, the larger the proportion of data +# that this node will store. You probably want all nodes to have the same number +# of tokens assuming they have equal hardware capability. +# +# If you leave this unspecified, Cassandra will use the default of 1 token for legacy compatibility, +# and will use the initial_token as described below. +# +# Specifying initial_token will override this setting. +# +# If you already have a cluster with 1 token per node, and wish to migrate to +# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations +# num_tokens: 256 + +# If you haven't specified num_tokens, or have set it to the default of 1 then +# you should always specify InitialToken when setting up a production # cluster for the first time, and often when adding capacity later. # The principle is that each node should be given an equal slice of # the token ring; see http://wiki.apache.org/cassandra/Operations @@ -21,21 +36,23 @@ cluster_name: 'Test Cluster' # a random token, which will lead to hot spots. initial_token: -# Set to true to make new [non-seed] nodes automatically migrate data -# to themselves from the pre-existing nodes in the cluster. Defaults -# to false because you can only bootstrap N machines at a time from -# an existing cluster of N, so if you are bringing up a cluster of -# 10 machines with 3 seeds you would have to do it in stages. Leaving -# this off for the initial start simplifies that. -auto_bootstrap: false - # See http://wiki.apache.org/cassandra/HintedHandoff hinted_handoff_enabled: true # this defines the maximum amount of time a dead host will have hints -# generated. After it has been dead this long, hints will be dropped. -max_hint_window_in_ms: 3600000 # one hour -# Sleep this long after delivering each row or row fragment -hinted_handoff_throttle_delay_in_ms: 50 +# generated. After it has been dead this long, new hints for it will not be +# created until it has been seen alive and gone down again. +max_hint_window_in_ms: 10800000 # 3 hours +# throttle in KB's per second, per delivery thread +hinted_handoff_throttle_in_kb: 1024 +# Number of threads with which to deliver hints; +# Consider increasing this number when you have multi-dc deployments, since +# cross-dc handoff tends to be slower +max_hints_delivery_threads: 2 + +# The following setting populates the page cache on memtable flush and compaction +# WARNING: Enable this setting only when the whole node's data fits in memory. +# Defaults to: false +# populate_io_cache_on_flush: false # authentication backend, implementing IAuthenticator; used to identify users authenticator: org.apache.cassandra.auth.AllowAllAuthenticator @@ -46,14 +63,13 @@ authorizer: org.apache.cassandra.auth.AllowAllAuthorizer # The partitioner is responsible for distributing rows (by key) across # nodes in the cluster. Any IPartitioner may be used, including your # own as long as it is on the classpath. Out of the box, Cassandra -# provides org.apache.cassandra.dht.RandomPartitioner -# org.apache.cassandra.dht.ByteOrderedPartitioner, -# org.apache.cassandra.dht.OrderPreservingPartitioner (deprecated), -# and org.apache.cassandra.dht.CollatingOrderPreservingPartitioner -# (deprecated). +# provides org.apache.cassandra.dht.{Murmur3Partitioner, RandomPartitioner +# ByteOrderedPartitioner, OrderPreservingPartitioner (deprecated)}. # # - RandomPartitioner distributes rows across the cluster evenly by md5. -# When in doubt, this is the best option. +# This is the default prior to 1.2 and is retained for compatibility. +# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128 +# Hash Function instead of md5. When in doubt, this is the best option. # - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows # scanning rows in key order, but the ordering can generate hot spots # for sequential insertion workloads. @@ -65,71 +81,142 @@ authorizer: org.apache.cassandra.auth.AllowAllAuthorizer # # See http://wiki.apache.org/cassandra/Operations for more on # partitioners and token selection. -partitioner: org.apache.cassandra.dht.RandomPartitioner +partitioner: org.apache.cassandra.dht.Murmur3Partitioner # directories where Cassandra should store data on disk. data_file_directories: - - /var/lib/cassandra/data + - /tmp/fat-client/data # commit log -commitlog_directory: /var/lib/cassandra/commitlog +commitlog_directory: /tmp/fat-client/commitlog + +# policy for data disk failures: +# stop: shut down gossip and Thrift, leaving the node effectively dead, but +# still inspectable via JMX. +# best_effort: stop using the failed disk and respond to requests based on +# remaining available sstables. This means you WILL see obsolete +# data at CL.ONE! +# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra +disk_failure_policy: stop + +# Maximum size of the key cache in memory. +# +# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the +# minimum, sometimes more. The key cache is fairly tiny for the amount of +# time it saves, so it's worthwhile to use it at large numbers. +# The row cache saves even more time, but must store the whole values of +# its rows, so it is extremely space-intensive. It's best to only use the +# row cache if you have hot rows or static rows. +# +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is empty to make it "auto" (min(5% of Heap (in MB), 100MB)). Set to 0 to disable key cache. +key_cache_size_in_mb: + +# Duration in seconds after which Cassandra should +# safe the keys cache. Caches are saved to saved_caches_directory as +# specified in this configuration file. +# +# Saved caches greatly improve cold-start speeds, and is relatively cheap in +# terms of I/O for the key cache. Row cache saving is much more expensive and +# has limited use. +# +# Default is 14400 or 4 hours. +key_cache_save_period: 14400 + +# Number of keys from the key cache to save +# Disabled by default, meaning all keys are going to be saved +# key_cache_keys_to_save: 100 + +# Maximum size of the row cache in memory. +# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# +# Default value is 0, to disable row caching. +row_cache_size_in_mb: 0 + +# Duration in seconds after which Cassandra should +# safe the row cache. Caches are saved to saved_caches_directory as specified +# in this configuration file. +# +# Saved caches greatly improve cold-start speeds, and is relatively cheap in +# terms of I/O for the key cache. Row cache saving is much more expensive and +# has limited use. +# +# Default is 0 to disable saving the row cache. +row_cache_save_period: 0 + +# Number of keys from the row cache to save +# Disabled by default, meaning all keys are going to be saved +# row_cache_keys_to_save: 100 + +# The provider for the row cache to use. +# +# Supported values are: ConcurrentLinkedHashCacheProvider, SerializingCacheProvider +# +# SerializingCacheProvider serialises the contents of the row and stores +# it in native memory, i.e., off the JVM Heap. Serialized rows take +# significantly less memory than "live" rows in the JVM, so you can cache +# more rows in a given memory footprint. And storing the cache off-heap +# means you can use smaller heap sizes, reducing the impact of GC pauses. +# +# It is also valid to specify the fully-qualified class name to a class +# that implements org.apache.cassandra.cache.IRowCacheProvider. +# +# Defaults to SerializingCacheProvider +row_cache_provider: SerializingCacheProvider + +# The pluggable Memory allocation for Off heap row cache, Experiments show that JEMAlloc +# saves some memory than the native GCC allocator. +# +# Supported values are: NativeAllocator, JEMallocAllocator +# +# If you intend to use JEMallocAllocator you have to install JEMalloc as library and +# modify cassandra-env.sh as directed in the file. +# +# Defaults to NativeAllocator +# memory_allocator: NativeAllocator # saved caches -saved_caches_directory: /var/lib/cassandra/saved_caches +saved_caches_directory: /tmp/fat-client/saved_caches # commitlog_sync may be either "periodic" or "batch." # When in batch mode, Cassandra won't ack writes until the commit log # has been fsynced to disk. It will wait up to -# CommitLogSyncBatchWindowInMS milliseconds for other writes, before +# commitlog_sync_batch_window_in_ms milliseconds for other writes, before # performing the sync. -commitlog_sync: periodic - -# the other option is "timed," where writes may be acked immediately +# +# commitlog_sync: batch +# commitlog_sync_batch_window_in_ms: 50 +# +# the other option is "periodic" where writes may be acked immediately # and the CommitLog is simply synced every commitlog_sync_period_in_ms # milliseconds. +commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 -# emergency pressure valve: each time heap usage after a full (CMS) -# garbage collection is above this fraction of the max, Cassandra will -# flush the largest memtables. -# -# Set to 1.0 to disable. Setting this lower than -# CMSInitiatingOccupancyFraction is not likely to be useful. -# -# RELYING ON THIS AS YOUR PRIMARY TUNING MECHANISM WILL WORK POORLY: -# it is most effective under light to moderate load, or read-heavy -# workloads; under truly massive write load, it will often be too -# little, too late. -flush_largest_memtables_at: 0.75 - -# emergency pressure valve #2: the first time heap usage after a full -# (CMS) garbage collection is above this fraction of the max, -# Cassandra will reduce cache maximum _capacity_ to the given fraction -# of the current _size_. Should usually be set substantially above -# flush_largest_memtables_at, since that will have less long-term -# impact on the system. -# -# Set to 1.0 to disable. Setting this lower than -# CMSInitiatingOccupancyFraction is not likely to be useful. -reduce_cache_sizes_at: 0.85 -reduce_cache_capacity_to: 0.6 - -# Addresses of hosts that are deemed contact points. -# Cassandra nodes use this list of hosts to find each other and learn -# the topology of the ring. You must change this if you are running -# multiple nodes! -seeds: - - 127.0.0.1 - -# Access mode. mmapped i/o is substantially faster, but only practical on -# a 64bit machine (which notably does not include EC2 "small" instances) -# or relatively small datasets. "auto", the safe choice, will enable -# mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only" -# (which may allow you to get part of the benefits of mmap on a 32bit -# machine by mmapping only index files) and "standard". -# (The buffer size settings that follow only apply to standard, -# non-mmapped i/o.) -disk_access_mode: auto +# The size of the individual commitlog file segments. A commitlog +# segment may be archived, deleted, or recycled once all the data +# in it (potentally from each columnfamily in the system) has been +# flushed to sstables. +# +# The default size is 32, which is almost always fine, but if you are +# archiving commitlog segments (see commitlog_archiving.properties), +# then you probably want a finer granularity of archiving; 8 or 16 MB +# is reasonable. +commitlog_segment_size_in_mb: 32 + +# any class that implements the SeedProvider interface and has a +# constructor that takes a Map of parameters will do. +seed_provider: + # Addresses of hosts that are deemed contact points. + # Cassandra nodes use this list of hosts to find each other and learn + # the topology of the ring. You must change this if you are running + # multiple nodes! + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + # seeds is actually a comma-delimited list of addresses. + # Ex: ",," + - seeds: "127.0.0.1" # For workloads with more data than can fit in memory, Cassandra's # bottleneck will be reads that need to fetch data from @@ -143,6 +230,21 @@ disk_access_mode: auto concurrent_reads: 32 concurrent_writes: 32 +# Total memory to use for memtables. Cassandra will flush the largest +# memtable when this much memory is used. +# If omitted, Cassandra will set it to 1/3 of the heap. +# memtable_total_space_in_mb: 2048 + +# Total space to use for commitlogs. Since commitlog segments are +# mmapped, and hence use up address space, the default size is 32 +# on 32-bit JVMs, and 1024 on 64-bit JVMs. +# +# If space gets above this value (it will round up to the next nearest +# segment multiple), Cassandra will flush every dirty CF in the oldest +# segment and remove it. So a small total commitlog space will tend +# to cause more flush activity on less-active columnfamilies. +# commitlog_total_space_in_mb: 4096 + # This sets the amount of memtable flush writer threads. These will # be blocked by disk io, and each one will hold a memtable in memory # while blocked. If you have a large heap and many data directories, @@ -150,9 +252,26 @@ concurrent_writes: 32 # By default this will be set to the amount of data directories defined. #memtable_flush_writers: 1 +# the number of full memtables to allow pending flush, that is, +# waiting for a writer thread. At a minimum, this should be set to +# the maximum number of secondary indexes created on a single CF. +memtable_flush_queue_size: 4 + +# Whether to, when doing sequential writing, fsync() at intervals in +# order to force the operating system to flush the dirty +# buffers. Enable this to avoid sudden dirty buffer flushing from +# impacting read latencies. Almost always a good idea on SSD:s; not +# necessarily on platters. +trickle_fsync: false +trickle_fsync_interval_in_kb: 10240 + # TCP port, for commands and data storage_port: 7000 +# SSL port, for encrypted communication. Unused unless enabled in +# encryption_options +ssl_storage_port: 7001 + # Address to bind to and tell other Cassandra nodes to connect to. You # _must_ change this if you want multiple nodes to be able to # communicate! @@ -165,45 +284,106 @@ storage_port: 7000 # Setting this to 0.0.0.0 is always wrong. listen_address: 127.0.0.2 +# Address to broadcast to other Cassandra nodes +# Leaving this blank will set it to the same value as listen_address +# broadcast_address: 1.2.3.4 + + +# Whether to start the native transport server. +# Currently, only the thrift server is started by default because the native +# transport is considered beta. +# Please note that the address on which the native transport is bound is the +# same as the rpc_address. The port however is different and specified below. +start_native_transport: false +# port for the CQL native transport to listen for clients on +native_transport_port: 9042 +# The minimum and maximum threads for handling requests when the native +# transport is used. The meaning is those is similar to the one of +# rpc_min_threads and rpc_max_threads, though the default differ slightly and +# are the ones below: +# native_transport_min_threads: 16 +# native_transport_max_threads: 128 + + +# Whether to start the thrift rpc server. +start_rpc: false # The address to bind the Thrift RPC service to -- clients connect # here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if # you want Thrift to listen on all interfaces. # # Leaving this blank has the same effect it does for ListenAddress, # (i.e. it will be based on the configured hostname of the node). -rpc_address: 127.0.0.2 +rpc_address: localhost # port for Thrift to listen for clients on rpc_port: 9160 # enable or disable keepalive on rpc connections rpc_keepalive: true +# Cassandra provides three out-of-the-box options for the RPC Server: +# +# sync -> One thread per thrift connection. For a very large number of clients, memory +# will be your limiting factor. On a 64 bit JVM, 128KB is the minimum stack size +# per thread, and that will correspond to your use of virtual memory (but physical memory +# may be limited depending on use of stack space). +# +# hsha -> Stands for "half synchronous, half asynchronous." All thrift clients are handled +# asynchronously using a small number of threads that does not vary with the amount +# of thrift clients (and thus scales well to many clients). The rpc requests are still +# synchronous (one thread per active request). +# +# The default is sync because on Windows hsha is about 30% slower. On Linux, +# sync/hsha performance is about the same, with hsha of course using less memory. +# +# Alternatively, can provide your own RPC server by providing the fully-qualified class name +# of an o.a.c.t.TServerFactory that can create an instance of it. +rpc_server_type: sync + +# Uncomment rpc_min|max_thread to set request pool size limits. +# +# Regardless of your choice of RPC server (see above), the number of maximum requests in the +# RPC thread pool dictates how many concurrent requests are possible (but if you are using the sync +# RPC server, it also dictates the number of clients that can be connected at all). +# +# The default is unlimited and thus provide no protection against clients overwhelming the server. You are +# encouraged to set a maximum that makes sense for you in production, but do keep in mind that +# rpc_max_threads represents the maximum number of client requests this server may execute concurrently. +# +# rpc_min_threads: 16 +# rpc_max_threads: 2048 + # uncomment to set socket buffer sizes on rpc connections # rpc_send_buff_size_in_bytes: # rpc_recv_buff_size_in_bytes: +# uncomment to set socket buffer size for internode communication +# internode_send_buff_size_in_bytes: +# internode_recv_buff_size_in_bytes: + # Frame size for thrift (maximum field length). -# 0 disables TFramedTransport in favor of TSocket. This option -# is deprecated; we strongly recommend using Framed mode. thrift_framed_transport_size_in_mb: 15 # The max length of a thrift message, including all fields and # internal thrift overhead. thrift_max_message_length_in_mb: 16 +# Set to true to have Cassandra create a hard link to each sstable +# flushed or streamed locally in a backups/ subdirectory of the +# Keyspace data. Removing these links is the operator's +# responsibility. +incremental_backups: false + # Whether or not to take a snapshot before each compaction. Be # careful using this option, since Cassandra won't clean up the # snapshots for you. Mostly useful if you're paranoid when there # is a data format change. snapshot_before_compaction: false -# change this to increase the compaction thread's priority. In java, 1 is the -# lowest priority and that is our default. -# compaction_thread_priority: 1 - -# The threshold size in megabytes the binary memtable must grow to, -# before it's submitted for flushing to disk. -binary_memtable_throughput_in_mb: 256 +# Whether or not a snapshot is taken of the data before keyspace truncation +# or dropping of column families. The STRONGLY advised default of true +# should be used to provide data safety. If you set this flag to false, you will +# lose data on truncation or drop. +auto_snapshot: true # Add column indexes to a row after its contents reach this size. # Increase if your column values are large, or if you have a very large @@ -219,39 +399,129 @@ column_index_size_in_kb: 64 # will be logged specifying the row key. in_memory_compaction_limit_in_mb: 64 +# Number of simultaneous compactions to allow, NOT including +# validation "compactions" for anti-entropy repair. Simultaneous +# compactions can help preserve read performance in a mixed read/write +# workload, by mitigating the tendency of small sstables to accumulate +# during a single long running compactions. The default is usually +# fine and if you experience problems with compaction running too +# slowly or too fast, you should look at +# compaction_throughput_mb_per_sec first. +# +# concurrent_compactors defaults to the number of cores. +# Uncomment to make compaction mono-threaded, the pre-0.8 default. +#concurrent_compactors: 1 + +# Multi-threaded compaction. When enabled, each compaction will use +# up to one thread per core, plus one thread per sstable being merged. +# This is usually only useful for SSD-based hardware: otherwise, +# your concern is usually to get compaction to do LESS i/o (see: +# compaction_throughput_mb_per_sec), not more. +multithreaded_compaction: false + +# Throttles compaction to the given total throughput across the entire +# system. The faster you insert data, the faster you need to compact in +# order to keep the sstable count down, but in general, setting this to +# 16 to 32 times the rate you are inserting data is more than sufficient. +# Setting this to 0 disables throttling. Note that this account for all types +# of compaction, including validation compaction. +compaction_throughput_mb_per_sec: 16 + # Track cached row keys during compaction, and re-cache their new # positions in the compacted sstable. Disable if you use really large # key caches. compaction_preheat_key_cache: true -# Time to wait for a reply from other nodes before failing the command -rpc_timeout_in_ms: 10000 +# Throttles all outbound streaming file transfers on this node to the +# given total throughput in Mbps. This is necessary because Cassandra does +# mostly sequential IO when streaming data during bootstrap or repair, which +# can lead to saturating the network connection and degrading rpc performance. +# When unset, the default is 400 Mbps or 50 MB/s. +# stream_throughput_outbound_megabits_per_sec: 400 + +# How long the coordinator should wait for read operations to complete +read_request_timeout_in_ms: 10000 +# How long the coordinator should wait for seq or index scans to complete +range_request_timeout_in_ms: 10000 +# How long the coordinator should wait for writes to complete +write_request_timeout_in_ms: 10000 +# How long the coordinator should wait for truncates to complete +# (This can be much longer, because unless auto_snapshot is disabled +# we need to flush first so we can snapshot before removing the data.) +truncate_request_timeout_in_ms: 60000 +# The default timeout for other, miscellaneous operations +request_timeout_in_ms: 10000 + +# Enable operation timeout information exchange between nodes to accurately +# measure request timeouts, If disabled cassandra will assuming the request +# was forwarded to the replica instantly by the coordinator +# +# Warning: before enabling this property make sure to ntp is installed +# and the times are synchronized between the nodes. +cross_node_timeout: false + +# Enable socket timeout for streaming operation. +# When a timeout occurs during streaming, streaming is retried from the start +# of the current file. This *can* involve re-streaming an important amount of +# data, so you should avoid setting the value too low. +# Default value is 0, which never timeout streams. +# streaming_socket_timeout_in_ms: 0 # phi value that must be reached for a host to be marked down. # most users should never need to adjust this. # phi_convict_threshold: 8 # endpoint_snitch -- Set this to a class that implements -# IEndpointSnitch, which will let Cassandra know enough -# about your network topology to route requests efficiently. +# IEndpointSnitch. The snitch has two functions: +# - it teaches Cassandra enough about your network topology to route +# requests efficiently +# - it allows Cassandra to spread replicas around your cluster to avoid +# correlated failures. It does this by grouping machines into +# "datacenters" and "racks." Cassandra will do its best not to have +# more than one replica on the same "rack" (which may not actually +# be a physical location) +# +# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER, +# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS +# ARE PLACED. +# # Out of the box, Cassandra provides -# - org.apache.cassandra.locator.SimpleSnitch: +# - SimpleSnitch: # Treats Strategy order as proximity. This improves cache locality # when disabling read repair, which can further improve throughput. -# - org.apache.cassandra.locator.RackInferringSnitch: +# Only appropriate for single-datacenter deployments. +# - PropertyFileSnitch: # Proximity is determined by rack and data center, which are -# assumed to correspond to the 3rd and 2nd octet of each node's -# IP address, respectively -# org.apache.cassandra.locator.PropertyFileSnitch: -# - Proximity is determined by rack and data center, which are # explicitly configured in cassandra-topology.properties. -endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch +# - GossipingPropertyFileSnitch +# The rack and datacenter for the local node are defined in +# cassandra-rackdc.properties and propagated to other nodes via gossip. If +# cassandra-topology.properties exists, it is used as a fallback, allowing +# migration from the PropertyFileSnitch. +# - RackInferringSnitch: +# Proximity is determined by rack and data center, which are +# assumed to correspond to the 3rd and 2nd octet of each node's +# IP address, respectively. Unless this happens to match your +# deployment conventions (as it did Facebook's), this is best used +# as an example of writing a custom Snitch class. +# - Ec2Snitch: +# Appropriate for EC2 deployments in a single Region. Loads Region +# and Availability Zone information from the EC2 API. The Region is +# treated as the Datacenter, and the Availability Zone as the rack. +# Only private IPs are used, so this will not work across multiple +# Regions. +# - Ec2MultiRegionSnitch: +# Uses public IPs as broadcast_address to allow cross-region +# connectivity. (Thus, you should set seed addresses to the public +# IP as well.) You will need to open the storage_port or +# ssl_storage_port on the public IP firewall. (For intra-Region +# traffic, Cassandra will switch to the private IP after +# establishing a connection.) +# +# You can use a custom Snitch by setting this to the full class name +# of the snitch, which will be assumed to be on your classpath. +endpoint_snitch: SimpleSnitch -# dynamic_snitch -- This boolean controls whether the above snitch is -# wrapped with a dynamic snitch, which will monitor read latencies -# and avoid reading from hosts that have slowed (due to compaction, -# for instance) -dynamic_snitch: true # controls how often to perform the more expensive part of host score # calculation dynamic_snitch_update_interval_in_ms: 100 @@ -265,7 +535,7 @@ dynamic_snitch_reset_interval_in_ms: 600000 # expressed as a double which represents a percentage. Thus, a value of # 0.2 means Cassandra would continue to prefer the static snitch values # until the pinned host was 20% worse than the fastest. -dynamic_snitch_badness_threshold: 0.0 +dynamic_snitch_badness_threshold: 0.1 # request_scheduler -- Set this to a class that implements # RequestScheduler, which will schedule incoming client requests @@ -307,178 +577,54 @@ request_scheduler: org.apache.cassandra.scheduler.NoScheduler # the request scheduling. Currently the only valid option is keyspace. # request_scheduler_id: keyspace -# Keyspaces have ColumnFamilies. (Usually 1 KS per application.) -# ColumnFamilies have Rows. (Dozens of CFs per KS.) -# Rows contain Columns. (Many per CF.) -# Columns contain name:value:timestamp. (Many per Row.) -# -# A KS is most similar to a schema, and a CF is most similar to a relational table. -# -# Keyspaces, ColumnFamilies, and Columns may carry additional -# metadata that change their behavior. These are as follows: -# -# Keyspace required parameters: -# - name: name of the keyspace; "system" is -# reserved for Cassandra Internals. -# - replica_placement_strategy: the class that determines how replicas -# are distributed among nodes. Contains both the class as well as -# configuration information. Must extend AbstractReplicationStrategy. -# Out of the box, Cassandra provides -# * org.apache.cassandra.locator.SimpleStrategy -# * org.apache.cassandra.locator.NetworkTopologyStrategy -# * org.apache.cassandra.locator.OldNetworkTopologyStrategy -# -# SimpleStrategy merely places the first -# replica at the node whose token is closest to the key (as determined -# by the Partitioner), and additional replicas on subsequent nodes -# along the ring in increasing Token order. -# -# With NetworkTopologyStrategy, -# for each datacenter, you can specify how many replicas you want -# on a per-keyspace basis. Replicas are placed on different racks -# within each DC, if possible. This strategy also requires rack aware -# snitch, such as RackInferringSnitch or PropertyFileSnitch. -# An example: -# - name: Keyspace1 -# replica_placement_strategy: org.apache.cassandra.locator.NetworkTopologyStrategy -# strategy_options: -# DC1 : 3 -# DC2 : 2 -# DC3 : 1 -# -# OldNetworkToplogyStrategy [formerly RackAwareStrategy] -# places one replica in each of two datacenters, and the third on a -# different rack in in the first. Additional datacenters are not -# guaranteed to get a replica. Additional replicas after three are placed -# in ring order after the third without regard to rack or datacenter. -# - replication_factor: Number of replicas of each row -# Keyspace optional paramaters: -# - strategy_options: Additional information for the replication strategy. -# - column_families: -# ColumnFamily required parameters: -# - name: name of the ColumnFamily. Must not contain the character "-". -# - compare_with: tells Cassandra how to sort the columns for slicing -# operations. The default is BytesType, which is a straightforward -# lexical comparison of the bytes in each column. Other options are -# AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType, LongType, -# and IntegerType (a generic variable-length integer type). -# You can also specify the fully-qualified class name to a class of -# your choice extending org.apache.cassandra.db.marshal.AbstractType. -# -# ColumnFamily optional parameters: -# - keys_cached: specifies the number of keys per sstable whose -# locations we keep in memory in "mostly LRU" order. (JUST the key -# locations, NOT any column values.) Specify a fraction (value less -# than 1) or an absolute number of keys to cache. Defaults to 200000 -# keys. -# - rows_cached: specifies the number of rows whose entire contents we -# cache in memory. Do not use this on ColumnFamilies with large rows, -# or ColumnFamilies with high write:read ratios. Specify a fraction -# (value less than 1) or an absolute number of rows to cache. -# Defaults to 0. (i.e. row caching is off by default) -# - comment: used to attach additional human-readable information about -# the column family to its definition. -# - read_repair_chance: specifies the probability with which read -# repairs should be invoked on non-quorum reads. must be between 0 -# and 1. defaults to 1.0 (always read repair). -# - gc_grace_seconds: specifies the time to wait before garbage -# collecting tombstones (deletion markers). defaults to 864000 (10 -# days). See http://wiki.apache.org/cassandra/DistributedDeletes -# - default_validation_class: specifies a validator class to use for -# validating all the column values in the CF. -# - populate_io_cache_on_flush: populates the page cache on memtable flush -# and compaction. Defaults to false. -# NOTE: -# min_ must be less than max_compaction_threshold! -# - min_compaction_threshold: the minimum number of SSTables needed -# to start a minor compaction. increasing this will cause minor -# compactions to start less frequently and be more intensive. setting -# this to 0 disables minor compactions. defaults to 4. -# - max_compaction_threshold: the maximum number of SSTables allowed -# before a minor compaction is forced. decreasing this will cause -# minor compactions to start more frequently and be less intensive. -# setting this to 0 disables minor compactions. defaults to 32. -# /NOTE -# - row_cache_save_period_in_seconds: number of seconds between saving -# row caches. The row caches can be saved periodically and if one -# exists on startup it will be loaded. -# - key_cache_save_period_in_seconds: number of seconds between saving -# key caches. The key caches can be saved periodically and if one -# exists on startup it will be loaded. -# - memtable_flush_after_mins: The maximum time to leave a dirty table -# unflushed. This should be large enough that it won't cause a flush -# storm of all memtables during periods of inactivity. -# - memtable_throughput_in_mb: The maximum size of the memtable before -# it is flushed. If undefined, 1/8 * heapsize will be used. -# - memtable_operations_in_millions: Number of operations in millions -# before the memtable is flushed. If undefined, throughput / 64 * 0.3 -# will be used. -# - column_metadata: -# Column required parameters: -# - name: binds a validator (and optionally an indexer) to columns -# with this name in any row of the enclosing column family. -# - validator: like cf.compare_with, an AbstractType that checks -# that the value of the column is well-defined. -# Column optional parameters: -# NOTE: -# index_name cannot be set if index_type is not also set! -# - index_name: User-friendly name for the index. -# - index_type: The type of index to be created. Currently only -# KEYS is supported. -# /NOTE -# -# NOTE: -# this keyspace definition is for demonstration purposes only. -# Cassandra will not load these definitions during startup. See -# http://wiki.apache.org/cassandra/FAQ#no_keyspaces for an explanation. -# /NOTE -keyspaces: - - name: Keyspace1 - replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy - replication_factor: 1 - column_families: - - name: Standard1 - compare_with: BytesType - keys_cached: 10000 - rows_cached: 1000 - row_cache_save_period_in_seconds: 0 - key_cache_save_period_in_seconds: 3600 - memtable_flush_after_mins: 59 - memtable_throughput_in_mb: 255 - memtable_operations_in_millions: 0.29 - - - name: Standard2 - compare_with: UTF8Type - read_repair_chance: 0.1 - keys_cached: 100 - gc_grace_seconds: 0 - min_compaction_threshold: 5 - max_compaction_threshold: 31 - - - name: StandardByUUID1 - compare_with: TimeUUIDType - - - name: Super1 - column_type: Super - compare_with: BytesType - compare_subcolumns_with: BytesType - - - name: Super2 - column_type: Super - compare_subcolumns_with: UTF8Type - rows_cached: 10000 - keys_cached: 50 - comment: 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings' - - - name: Super3 - column_type: Super - compare_with: LongType - comment: 'A column family with supercolumns, whose column names are Longs (8 bytes)' - - - name: Indexed1 - default_validation_class: LongType - column_metadata: - - name: birthdate - validator_class: LongType - index_name: birthdate_idx - index_type: KEYS +# Enable or disable inter-node encryption +# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that +# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher +# suite for authentication, key exchange and encryption of the actual data transfers. +# NOTE: No custom encryption options are enabled at the moment +# The available internode options are : all, none, dc, rack +# +# If set to dc cassandra will encrypt the traffic between the DCs +# If set to rack cassandra will encrypt the traffic between the racks +# +# The passwords used in these options must match the passwords used when generating +# the keystore and truststore. For instructions on generating these files, see: +# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore +# +server_encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra + # More advanced defaults below: + # protocol: TLS + # algorithm: SunX509 + # store_type: JKS + # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA] + # require_client_auth: false + +# enable or disable client/server encryption. +client_encryption_options: + enabled: false + keystore: conf/.keystore + keystore_password: cassandra + # More advanced defaults below: + # protocol: TLS + # algorithm: SunX509 + # store_type: JKS + # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA] + # require_client_auth: false + +# internode_compression controls whether traffic between nodes is +# compressed. +# can be: all - all traffic is compressed +# dc - traffic between different datacenters is compressed +# none - nothing is compressed. +internode_compression: all + +# Enable or disable tcp_nodelay for inter-dc communication. +# Disabling it will result in larger (but fewer) network packets being sent, +# reducing overhead from the TCP protocol itself, at the cost of increasing +# latency if you block for cross-datacenter responses. +inter_dc_tcp_nodelay: false http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/examples/client_only/src/ClientOnlyExample.java ---------------------------------------------------------------------- diff --git a/examples/client_only/src/ClientOnlyExample.java b/examples/client_only/src/ClientOnlyExample.java index d6f80ae..d73043e 100644 --- a/examples/client_only/src/ClientOnlyExample.java +++ b/examples/client_only/src/ClientOnlyExample.java @@ -19,20 +19,13 @@ import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; - -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.marshal.AsciiType; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.*; +import org.apache.cassandra.transport.messages.ResultMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,36 +34,35 @@ public class ClientOnlyExample { private static final Logger logger = LoggerFactory.getLogger(ClientOnlyExample.class); - private static final String KEYSPACE = "Keyspace1"; - private static final String COLUMN_FAMILY = "Standard1"; - + private static final String KEYSPACE = "keyspace1"; + private static final String COLUMN_FAMILY = "standard1"; + private static void startClient() throws Exception { StorageService.instance.initClient(); - // sleep for a bit so that gossip can do its thing. - try - { - Thread.sleep(10000L); - } - catch (Exception ex) - { - throw new AssertionError(ex); - } } private static void testWriting() throws Exception { + ClientState state = new ClientState(false); + state.setKeyspace(KEYSPACE); // do some writing. for (int i = 0; i < 100; i++) { - RowMutation change = new RowMutation(KEYSPACE, ByteBufferUtil.bytes(("key" + i))); - change.add(COLUMN_FAMILY, ByteBufferUtil.bytes("colb"), ByteBufferUtil.bytes(("value" + i)), 0); + QueryProcessor.process( + new StringBuilder() + .append("INSERT INTO ") + .append(COLUMN_FAMILY) + .append(" (id, name, value) VALUES ( 'key") + .append(i) + .append("', 'colb', 'value") + .append(i) + .append("' )") + .toString(), + ConsistencyLevel.QUORUM, + new QueryState(state) + ); - // don't call change.apply(). The reason is that is makes a static call into Table, which will perform - // local storage initialization, which creates local directories. - // change.apply(); - - StorageProxy.mutate(Arrays.asList(change), ConsistencyLevel.ONE); System.out.println("wrote key" + i); } System.out.println("Done writing."); @@ -79,112 +71,111 @@ public class ClientOnlyExample private static void testReading() throws Exception { // do some queries. + ClientState state = new ClientState(false); + state.setKeyspace(KEYSPACE); for (int i = 0; i < 100; i++) { - List commands = new ArrayList(); - SliceByNamesReadCommand readCommand = new SliceByNamesReadCommand(KEYSPACE, ByteBufferUtil.bytes(("key" + i)), COLUMN_FAMILY, new NamesQueryFilter(ByteBufferUtil.bytes("colb"))); - readCommand.setDigestQuery(false); - commands.add(readCommand); - List rows = StorageProxy.read(commands, ConsistencyLevel.ONE); + List> rows = ((ResultMessage.Rows)QueryProcessor.process( + new StringBuilder() + .append("SELECT id, name, value FROM ") + .append(COLUMN_FAMILY) + .append(" WHERE id = 'key") + .append(i) + .append("'") + .toString(), + ConsistencyLevel.QUORUM, + new QueryState(state) + )).result.rows; + assert rows.size() == 1; - Row row = rows.get(0); - ColumnFamily cf = row.cf; - if (cf != null) - { - for (IColumn col : cf.getSortedColumns()) - { - System.out.println(ByteBufferUtil.string(col.name()) + ", " + ByteBufferUtil.string(col.value())); - } - } - else - System.err.println("This output indicates that nothing was read."); + List r = rows.get(0); + assert r.size() == 3; + System.out.println(new StringBuilder() + .append("ID: ") + .append(AsciiType.instance.compose(r.get(0))) + .append(", Name: ") + .append(AsciiType.instance.compose(r.get(1))) + .append(", Value: ") + .append(AsciiType.instance.compose(r.get(2))) + .toString()); } } /** * First, bring one or more nodes up. Then run ClientOnlyExample with these VM arguments: - * + *

* -Xmx1G * -Dcassandra.config=/Users/gary/cassandra/conf/cassandra.yaml (optional, will first look for cassandra.yaml on classpath) - * + *

* Pass "write" or "read" into the program to exercise the various methods. - * + *

* Caveats: - * + *

* 1. Much of cassandra is not reentrant. That is, you can't spin a client up, down, then back up in the same jvm. * 2. Because of the above, you still need to force-quit the process. StorageService.stopClient() doesn't (can't) - * spin everything down. + * spin everything down. */ public static void main(String args[]) throws Exception { startClient(); - setupKeyspace(createConnection()); + setupKeyspace(); testWriting(); logger.info("Writing is done. Sleeping, then will try to read."); try { - Thread.currentThread().sleep(10000); + Thread.currentThread().sleep(1000); } - catch (InterruptedException ex) + catch (InterruptedException ex) { throw new RuntimeException(ex); } - + testReading(); - + // no need to do this: // StorageService.instance().decommission(); // do this instead: StorageService.instance.stopClient(); System.exit(0); // the only way to really stop the process. } - - /** - * This method will fail if the keyspace already exists - */ - private static void setupKeyspace(Cassandra.Iface client) throws TException, InvalidRequestException - { - List cfDefList = new ArrayList(); - CfDef columnFamily = new CfDef(KEYSPACE, COLUMN_FAMILY); - cfDefList.add(columnFamily); - - try - { - client.system_add_keyspace(new KsDef(KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList)); - int magnitude = client.describe_ring(KEYSPACE).size(); - try - { - Thread.sleep(1000 * magnitude); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - } - catch (InvalidRequestException probablyExists) - { - logger.warn("Problem creating keyspace: " + probablyExists.getMessage()); - } - } - private static Cassandra.Iface createConnection() throws TTransportException + private static void setupKeyspace() throws RequestExecutionException, RequestValidationException, InterruptedException { - if (System.getProperty("cassandra.host") == null || System.getProperty("cassandra.port") == null) + if (QueryProcessor.process( + new StringBuilder() + .append("SELECT * FROM system.schema_keyspaces WHERE keyspace_name='") + .append(KEYSPACE) + .append("'") + .toString(), ConsistencyLevel.QUORUM) + .isEmpty()) { - logger.warn("cassandra.host or cassandra.port is not defined, using default"); + QueryProcessor.process(new StringBuilder() + .append("CREATE KEYSPACE ") + .append(KEYSPACE) + .append(" WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }") + .toString(), ConsistencyLevel.QUORUM); + Thread.sleep(1000); } - return createConnection( System.getProperty("cassandra.host","localhost"), - Integer.valueOf(System.getProperty("cassandra.port","9160")), - Boolean.valueOf(System.getProperty("cassandra.framed", "true")) ); - } - private static Cassandra.Client createConnection(String host, Integer port, boolean framed) throws TTransportException - { - TSocket socket = new TSocket(host, port); - TTransport trans = framed ? new TFramedTransport(socket) : socket; - trans.open(); - TProtocol protocol = new TBinaryProtocol(trans); + if (QueryProcessor.process( + new StringBuilder() + .append("SELECT * FROM system.schema_columnfamilies WHERE keyspace_name='") + .append(KEYSPACE) + .append("' AND columnfamily_name='") + .append(COLUMN_FAMILY) + .append("'") + .toString(), ConsistencyLevel.QUORUM) + .isEmpty()) + { + ClientState state = new ClientState(); + state.setKeyspace(KEYSPACE); - return new Cassandra.Client(protocol); + QueryProcessor.process(new StringBuilder() + .append("CREATE TABLE ") + .append(COLUMN_FAMILY) + .append(" ( id ascii PRIMARY KEY, name ascii, value ascii )") + .toString(), ConsistencyLevel.QUORUM, new QueryState(state)); + Thread.sleep(1000); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 9cd4c87..a834506 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -1060,12 +1060,14 @@ public class SelectStatement implements CQLStatement { stmt.isKeyRange = true; boolean hasEq = false; - SecondaryIndexManager idxManager = Table.open(keyspace()).getColumnFamilyStore(columnFamily()).indexManager; Set indexedNames = new HashSet(); - for (SecondaryIndex index : idxManager.getIndexes()) + indexedNames.add(cfm.getKeyName()); + for (ColumnDefinition cfdef : cfm.getColumn_metadata().values()) { - for (ColumnDefinition cdef : index.getColumnDefs()) - indexedNames.add(cdef.name); + if (cfdef.getIndexType() != null) + { + indexedNames.add(cfdef.name); + } } // Note: we cannot use idxManager.indexes() methods because we don't have a complete column name at this point, we only http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 1ce8bd7..ae920e1 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -556,6 +556,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } + public boolean isFatClient(InetAddress endpoint) + { + EndpointState epState = endpointStateMap.get(endpoint); + if (epState == null) + { + return false; + } + return !isDeadState(epState) && !epState.isAlive() && !StorageService.instance.getTokenMetadata().isMember(endpoint); + } + private void doStatusCheck() { long now = System.currentTimeMillis(); @@ -574,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean // check if this is a fat client. fat clients are removed automatically from // gossip after FatClientTimeout. Do not remove dead states here. - if (!isDeadState(epState) && !epState.isAlive() && !StorageService.instance.getTokenMetadata().isMember(endpoint) && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout)) + if (isFatClient(endpoint) && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout)) { logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index c0361bf..7a15bda 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ReadCommand; @@ -126,11 +127,18 @@ public abstract class AbstractReadExecutor public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistency_level) throws UnavailableException { Table table = Table.open(command.table); - ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName); List allReplicas = StorageProxy.getLiveSortedEndpoints(table, command.key); - List queryTargets = consistency_level.filterForQuery(table, allReplicas, cfs.metadata.newReadRepairDecision()); + CFMetaData metaData = Schema.instance.getCFMetaData(command.table, command.cfName); + List queryTargets = consistency_level.filterForQuery(table, allReplicas, metaData.newReadRepairDecision()); + + if (StorageService.instance.isClientMode()) + { + return new DefaultReadExecutor(null, command, consistency_level, allReplicas, queryTargets); + } + + ColumnFamilyStore cfs = table.getColumnFamilyStore(command.cfName); - switch (cfs.metadata.getSpeculativeRetry().type) + switch (metaData.getSpeculativeRetry().type) { case ALWAYS: return new SpeculateAlwaysExecutor(cfs, command, consistency_level, allReplicas, queryTargets); http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 23e7842..a7b2fcf 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -116,6 +116,9 @@ public class MigrationManager implements IEndpointStateChangeSubscriber if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117) return; + if (Gossiper.instance.isFatClient(endpoint)) + return; + if (Schema.instance.getVersion().equals(theirVersion)) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d2d3c2c..7e2fe05 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -372,10 +372,38 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public synchronized void initClient() throws IOException, ConfigurationException { - initClient(RING_DELAY); + // We don't wait, because we're going to actually try to work on + initClient(0); + + try + { + // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply). + boolean isUp = false; + while (!isUp) + { + Thread.sleep(1000); + for (InetAddress address : Gossiper.instance.getLiveMembers()) + { + if (!Gossiper.instance.isFatClient(address)) + { + isUp = true; + } + } + } + + // sleep until any schema migrations have finished + while (!MigrationManager.isReadyForBootstrap()) + { + Thread.sleep(1000); + } + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } } - public synchronized void initClient(int delay) throws IOException, ConfigurationException + public synchronized void initClient(int ringDelay) throws IOException, ConfigurationException { if (initialized) { @@ -388,14 +416,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.info("Starting up client gossip"); setMode(Mode.CLIENT, false); Gossiper.instance.register(this); - Gossiper.instance.start((int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering. + Gossiper.instance.register(migrationManager); + Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering. Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion()); - MessagingService.instance().listen(FBUtilities.getLocalAddress()); + Schema.instance.updateVersion(); - // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply). + MessagingService.instance().listen(FBUtilities.getLocalAddress()); try { - Thread.sleep(delay); + Thread.sleep(ringDelay); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java index 298f1c7..3ac9ecb 100644 --- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java @@ -35,7 +35,7 @@ public class DynamicEndpointSnitchTest public void testSnitch() throws InterruptedException, IOException, ConfigurationException { // do this because SS needs to be initialized before DES can work properly. - StorageService.instance.initClient(0); + StorageService.instance.initClient(); int sleeptime = 150; SimpleSnitch ss = new SimpleSnitch(); DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/test/unit/org/apache/cassandra/service/InitClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/InitClientTest.java b/test/unit/org/apache/cassandra/service/InitClientTest.java index 7d44cd8..23c4b1c 100644 --- a/test/unit/org/apache/cassandra/service/InitClientTest.java +++ b/test/unit/org/apache/cassandra/service/InitClientTest.java @@ -30,6 +30,6 @@ public class InitClientTest // extends CleanupHelper @Test public void testInitClientStartup() throws IOException, ConfigurationException { - StorageService.instance.initClient(0); + StorageService.instance.initClient(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/338fa9a3/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java b/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java index 19efe3a..b473c18 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java @@ -35,7 +35,7 @@ public class StorageServiceClientTest { SchemaLoader.mkdirs(); SchemaLoader.cleanup(); - StorageService.instance.initClient(0); + StorageService.instance.initClient(); // verify that no storage directories were created. for (String path : DatabaseDescriptor.getAllDataFileLocations())