From commits-return-213763-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Sat Sep 1 03:35:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6840A18072F for ; Sat, 1 Sep 2018 03:35:56 +0200 (CEST) Received: (qmail 62987 invoked by uid 500); 1 Sep 2018 01:35:55 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 62685 invoked by uid 99); 1 Sep 2018 01:35:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Sep 2018 01:35:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CBF10E04C9; Sat, 1 Sep 2018 01:35:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aweisberg@apache.org To: commits@cassandra.apache.org Date: Sat, 01 Sep 2018 01:36:06 -0000 Message-Id: <4dd0e48946ed4e8199ea64116362df85@git.apache.org> In-Reply-To: <9d73255749744ffca4f5dbb72a044138@git.apache.org> References: <9d73255749744ffca4f5dbb72a044138@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/18] cassandra git commit: Transient Replication and Cheap Quorums http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index db73b4f..8eb8603 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -21,6 +21,9 @@ import java.util.Collection; import java.util.Set; import java.util.UUID; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.db.RowIndexEntry; @@ -85,13 +88,15 @@ public class BigFormat implements SSTableFormat long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection observers, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn); + SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); + return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn); } } @@ -120,7 +125,7 @@ public class BigFormat implements SSTableFormat // mb (3.0.7, 3.7): commit log lower bound included // mc (3.0.8, 3.9): commit log intervals included - // na (4.0.0): uncompressed chunks, pending repair session, checksummed sstable metadata file, new Bloomfilter format + // na (4.0.0): uncompressed chunks, pending repair session, isTransient, checksummed sstable metadata file, new Bloomfilter format // // NOTE: when adding a new version, please add that to LegacySSTableTest, too. @@ -131,6 +136,7 @@ public class BigFormat implements SSTableFormat public final boolean hasMaxCompressedLength; private final boolean hasPendingRepair; private final boolean hasMetadataChecksum; + private final boolean hasIsTransient; /** * CASSANDRA-9067: 4.0 bloom filter representation changed (two longs just swapped) * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution. @@ -148,6 +154,7 @@ public class BigFormat implements SSTableFormat hasCommitLogIntervals = version.compareTo("mc") >= 0; hasMaxCompressedLength = version.compareTo("na") >= 0; hasPendingRepair = version.compareTo("na") >= 0; + hasIsTransient = version.compareTo("na") >= 0; hasMetadataChecksum = version.compareTo("na") >= 0; hasOldBfFormat = version.compareTo("na") < 0; } @@ -176,6 +183,12 @@ public class BigFormat implements SSTableFormat } @Override + public boolean hasIsTransient() + { + return hasIsTransient; + } + + @Override public int correspondingMessagingVersion() { return correspondingMessagingVersion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index b5488ed..7513e95 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -68,13 +68,14 @@ public class BigTableWriter extends SSTableWriter long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection observers, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers); + super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers); txn.trackNew(this); // must track before any files are created if (compression) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java index 6a40d94..eb7b2c7 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java @@ -71,7 +71,7 @@ public interface IMetadataSerializer void mutateLevel(Descriptor descriptor, int newLevel) throws IOException; /** - * Mutate the repairedAt time and pendingRepair ID + * Mutate the repairedAt time, pendingRepair ID, and transient status */ - void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException; + public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 9d9c1a8..36c218b 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -83,7 +83,8 @@ public class MetadataCollector implements PartitionStatisticsCollector ActiveRepairService.UNREPAIRED_SSTABLE, -1, -1, - null); + null, + false); } protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); @@ -272,7 +273,7 @@ public class MetadataCollector implements PartitionStatisticsCollector this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; } - public Map finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header) + public Map finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header) { Map components = new EnumMap<>(MetadataType.class); components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); @@ -294,7 +295,8 @@ public class MetadataCollector implements PartitionStatisticsCollector repairedAt, totalColumnsSet, totalRows, - pendingRepair)); + pendingRepair, + isTransient)); components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality)); components.put(MetadataType.HEADER, header.toComponent()); return components; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 74923a0..f76db2d 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -230,7 +230,7 @@ public class MetadataSerializer implements IMetadataSerializer rewriteSSTableMetadata(descriptor, currentComponents); } - public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException + public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException { if (logger.isTraceEnabled()) logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}", @@ -238,7 +238,7 @@ public class MetadataSerializer implements IMetadataSerializer Map currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class)); StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS); // mutate time & id - currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair)); + currentComponents.put(MetadataType.STATS, stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient)); rewriteSSTableMetadata(descriptor, currentComponents); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 2b8ebef..f14fb5d 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -64,6 +64,7 @@ public class StatsMetadata extends MetadataComponent public final long totalColumnsSet; public final long totalRows; public final UUID pendingRepair; + public final boolean isTransient; public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, @@ -83,7 +84,8 @@ public class StatsMetadata extends MetadataComponent long repairedAt, long totalColumnsSet, long totalRows, - UUID pendingRepair) + UUID pendingRepair, + boolean isTransient) { this.estimatedPartitionSize = estimatedPartitionSize; this.estimatedColumnCount = estimatedColumnCount; @@ -104,6 +106,7 @@ public class StatsMetadata extends MetadataComponent this.totalColumnsSet = totalColumnsSet; this.totalRows = totalRows; this.pendingRepair = pendingRepair; + this.isTransient = isTransient; } public MetadataType getType() @@ -155,10 +158,11 @@ public class StatsMetadata extends MetadataComponent repairedAt, totalColumnsSet, totalRows, - pendingRepair); + pendingRepair, + isTransient); } - public StatsMetadata mutateRepairedAt(long newRepairedAt) + public StatsMetadata mutateRepairedMetadata(long newRepairedAt, UUID newPendingRepair, boolean newIsTransient) { return new StatsMetadata(estimatedPartitionSize, estimatedColumnCount, @@ -178,30 +182,8 @@ public class StatsMetadata extends MetadataComponent newRepairedAt, totalColumnsSet, totalRows, - pendingRepair); - } - - public StatsMetadata mutatePendingRepair(UUID newPendingRepair) - { - return new StatsMetadata(estimatedPartitionSize, - estimatedColumnCount, - commitLogIntervals, - minTimestamp, - maxTimestamp, - minLocalDeletionTime, - maxLocalDeletionTime, - minTTL, - maxTTL, - compressionRatio, - estimatedTombstoneDropTime, - sstableLevel, - minClusteringValues, - maxClusteringValues, - hasLegacyCounterShards, - repairedAt, - totalColumnsSet, - totalRows, - newPendingRepair); + newPendingRepair, + newIsTransient); } @Override @@ -292,6 +274,12 @@ public class StatsMetadata extends MetadataComponent if (component.pendingRepair != null) size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0); } + + if (version.hasIsTransient()) + { + size += TypeSizes.sizeof(component.isTransient); + } + return size; } @@ -338,6 +326,11 @@ public class StatsMetadata extends MetadataComponent out.writeByte(0); } } + + if (version.hasIsTransient()) + { + out.writeBoolean(component.isTransient); + } } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException @@ -386,6 +379,8 @@ public class StatsMetadata extends MetadataComponent pendingRepair = UUIDSerializer.serializer.deserialize(in, 0); } + boolean isTransient = version.hasIsTransient() && in.readBoolean(); + return new StatsMetadata(partitionSizes, columnCounts, commitLogIntervals, @@ -404,7 +399,8 @@ public class StatsMetadata extends MetadataComponent repairedAt, totalColumnsSet, totalRows, - pendingRepair); + pendingRepair, + isTransient); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java index 2ee8eea..2e7408b 100644 --- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java @@ -17,13 +17,12 @@ */ package org.apache.cassandra.locator; -import java.util.*; - +import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; public abstract class AbstractEndpointSnitch implements IEndpointSnitch { - public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); + public abstract int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2); /** * Sorts the Collection of node addresses by proximity to the given address @@ -31,27 +30,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch * @param unsortedAddress the nodes to sort * @return a new sorted List */ - public List getSortedListByProximity(InetAddressAndPort address, Collection unsortedAddress) - { - List preferred = new ArrayList<>(unsortedAddress); - sortByProximity(address, preferred); - return preferred; - } - - /** - * Sorts the List of node addresses, in-place, by proximity to the given address - * @param address the address to sort the proximity by - * @param addresses the nodes to sort - */ - public void sortByProximity(final InetAddressAndPort address, List addresses) + public > C sortedByProximity(final InetAddressAndPort address, C unsortedAddress) { - Collections.sort(addresses, new Comparator() - { - public int compare(InetAddressAndPort a1, InetAddressAndPort a2) - { - return compareEndpoints(address, a1, a2); - } - }); + return unsortedAddress.sorted((r1, r2) -> compareEndpoints(address, r1, r2)); } public void gossiperStarting() @@ -59,7 +40,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch // noop by default } - public boolean isWorthMergingForRangeQuery(List merged, List l1, List l2) + public boolean isWorthMergingForRangeQuery(ReplicaCollection merged, ReplicaCollection l1, ReplicaCollection l2) { // Querying remote DC is likely to be an order of magnitude slower than // querying locally, so 2 queries to local nodes is likely to still be @@ -70,14 +51,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch : true; } - private boolean hasRemoteNode(List l) + private boolean hasRemoteNode(ReplicaCollection l) { String localDc = DatabaseDescriptor.getLocalDataCenter(); - for (InetAddressAndPort ep : l) - { - if (!localDc.equals(getDatacenter(ep))) - return true; - } - return false; + return Iterables.any(l, replica -> !localDc.equals(getDatacenter(replica))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java index e91f6ac..08c41f0 100644 --- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java @@ -37,8 +37,11 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit */ abstract public String getDatacenter(InetAddressAndPort endpoint); - public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2) + @Override + public int compareEndpoints(InetAddressAndPort address, Replica r1, Replica r2) { + InetAddressAndPort a1 = r1.endpoint(); + InetAddressAndPort a2 = r2.endpoint(); if (address.equals(a1) && !address.equals(a2)) return -1; if (address.equals(a2) && !address.equals(a1)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java new file mode 100644 index 0000000..ecf1296 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.collect.Iterables; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Stream; + +/** + * A collection like class for Replica objects. Since the Replica class contains inetaddress, range, and + * transient replication status, basic contains and remove methods can be ambiguous. Replicas forces you + * to be explicit about what you're checking the container for, or removing from it. + */ +public abstract class AbstractReplicaCollection> implements ReplicaCollection +{ + protected static final List EMPTY_LIST = new ArrayList<>(); // since immutable, can safely return this to avoid megamorphic callsites + + public static , B extends Builder> Collector collector(Set characteristics, Supplier supplier) + { + return new Collector() + { + private final BiConsumer accumulator = Builder::add; + private final BinaryOperator combiner = (a, b) -> { a.addAll(b.mutable); return a; }; + private final Function finisher = Builder::build; + public Supplier supplier() { return supplier; } + public BiConsumer accumulator() { return accumulator; } + public BinaryOperator combiner() { return combiner; } + public Function finisher() { return finisher; } + public Set characteristics() { return characteristics; } + }; + } + + protected final List list; + protected final boolean isSnapshot; + protected AbstractReplicaCollection(List list, boolean isSnapshot) + { + this.list = list; + this.isSnapshot = isSnapshot; + } + + // if subList == null, should return self (or a clone thereof) + protected abstract C snapshot(List subList); + protected abstract C self(); + /** + * construct a new Mutable of our own type, so that we can concatenate + * TODO: this isn't terribly pretty, but we need sometimes to select / merge two Endpoints of unknown type; + */ + public abstract Mutable newMutable(int initialCapacity); + + + public C snapshot() + { + return isSnapshot ? self() + : snapshot(list.isEmpty() ? EMPTY_LIST + : new ArrayList<>(list)); + } + + public final C subList(int start, int end) + { + List subList; + if (isSnapshot) + { + if (start == 0 && end == size()) return self(); + else if (start == end) subList = EMPTY_LIST; + else subList = list.subList(start, end); + } + else + { + if (start == end) subList = EMPTY_LIST; + else subList = new ArrayList<>(list.subList(start, end)); // TODO: we could take a subList here, but comodification checks stop us + } + return snapshot(subList); + } + + public final C filter(Predicate predicate) + { + return filter(predicate, Integer.MAX_VALUE); + } + + public final C filter(Predicate predicate, int limit) + { + if (isEmpty()) + return snapshot(); + + List copy = null; + int beginRun = -1, endRun = -1; + int i = 0; + for (; i < list.size() ; ++i) + { + Replica replica = list.get(i); + if (predicate.test(replica)) + { + if (copy != null) + copy.add(replica); + else if (beginRun < 0) + beginRun = i; + else if (endRun > 0) + { + copy = new ArrayList<>(Math.min(limit, (list.size() - i) + (endRun - beginRun))); + for (int j = beginRun ; j < endRun ; ++j) + copy.add(list.get(j)); + copy.add(list.get(i)); + } + if (--limit == 0) + { + ++i; + break; + } + } + else if (beginRun >= 0 && endRun < 0) + endRun = i; + } + + if (beginRun < 0) + beginRun = endRun = 0; + if (endRun < 0) + endRun = i; + if (copy == null) + return subList(beginRun, endRun); + return snapshot(copy); + } + + public final class Select + { + private final List result; + public Select(int expectedSize) + { + this.result = new ArrayList<>(expectedSize); + } + + /** + * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates. + * Stop once we have targetSize replicas in total, including preceding calls + */ + public Select add(Predicate predicate, int targetSize) + { + assert !Iterables.any(result, predicate::test); + for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i) + if (predicate.test(list.get(i))) + result.add(list.get(i)); + return this; + } + public Select add(Predicate predicate) + { + return add(predicate, Integer.MAX_VALUE); + } + public C get() + { + return snapshot(result); + } + } + + /** + * An efficient method for selecting a subset of replica via a sequence of filters. + * + * Example: select().add(filter1).add(filter2, 3).get(); + * + * @return a Select object + */ + public final Select select() + { + return select(list.size()); + } + public final Select select(int expectedSize) + { + return new Select(expectedSize); + } + + public final C sorted(Comparator comparator) + { + List copy = new ArrayList<>(list); + copy.sort(comparator); + return snapshot(copy); + } + + public final Replica get(int i) + { + return list.get(i); + } + + public final int size() + { + return list.size(); + } + + public final boolean isEmpty() + { + return list.isEmpty(); + } + + public final Iterator iterator() + { + return list.iterator(); + } + + public final Stream stream() { return list.stream(); } + + public final boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof AbstractReplicaCollection)) + { + if (!(o instanceof ReplicaCollection)) + return false; + + ReplicaCollection that = (ReplicaCollection) o; + return Iterables.elementsEqual(this, that); + } + AbstractReplicaCollection that = (AbstractReplicaCollection) o; + return Objects.equals(list, that.list); + } + + public final int hashCode() + { + return list.hashCode(); + } + + @Override + public final String toString() + { + return list.toString(); + } + + static > C concat(C replicas, C extraReplicas, Mutable.Conflict ignoreConflicts) + { + if (extraReplicas.isEmpty()) + return replicas; + if (replicas.isEmpty()) + return extraReplicas; + Mutable mutable = replicas.newMutable(replicas.size() + extraReplicas.size()); + mutable.addAll(replicas); + mutable.addAll(extraReplicas, ignoreConflicts); + return mutable.asImmutableView(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 3e9b5bb..0ddc0a4 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -22,8 +22,8 @@ import java.lang.reflect.InvocationTargetException; import java.util.*; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; +import com.google.common.base.Preconditions; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +73,9 @@ public abstract class AbstractReplicationStrategy // lazy-initialize keyspace itself since we don't create them until after the replication strategies } - private final Map> cachedEndpoints = new NonBlockingHashMap>(); + private final Map cachedReplicas = new NonBlockingHashMap<>(); - public ArrayList getCachedEndpoints(Token t) + public EndpointsForRange getCachedReplicas(Token t) { long lastVersion = tokenMetadata.getRingVersion(); @@ -86,13 +86,13 @@ public abstract class AbstractReplicationStrategy if (lastVersion > lastInvalidatedVersion) { logger.trace("clearing cached endpoints"); - cachedEndpoints.clear(); + cachedReplicas.clear(); lastInvalidatedVersion = lastVersion; } } } - return cachedEndpoints.get(t); + return cachedReplicas.get(t); } /** @@ -102,64 +102,65 @@ public abstract class AbstractReplicationStrategy * @param searchPosition the position the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public ArrayList getNaturalEndpoints(RingPosition searchPosition) + public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition) + { + return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken()); + } + + public EndpointsForRange getNaturalReplicas(RingPosition searchPosition) { Token searchToken = searchPosition.getToken(); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); - ArrayList endpoints = getCachedEndpoints(keyToken); + EndpointsForRange endpoints = getCachedReplicas(keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); - endpoints = new ArrayList(calculateNaturalEndpoints(searchToken, tm)); - cachedEndpoints.put(keyToken, endpoints); + endpoints = calculateNaturalReplicas(searchToken, tm); + cachedReplicas.put(keyToken, endpoints); } - return new ArrayList(endpoints); + return endpoints; } /** * calculate the natural endpoints for the given token * - * @see #getNaturalEndpoints(org.apache.cassandra.dht.RingPosition) + * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition) * * @param searchToken the token the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public abstract List calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); + public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); - public AbstractWriteResponseHandler getWriteResponseHandler(Collection naturalEndpoints, - Collection pendingEndpoints, - ConsistencyLevel consistency_level, + public AbstractWriteResponseHandler getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { - return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); + return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); } - public AbstractWriteResponseHandler getWriteResponseHandler(Collection naturalEndpoints, - Collection pendingEndpoints, - ConsistencyLevel consistency_level, + public AbstractWriteResponseHandler getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime, ConsistencyLevel idealConsistencyLevel) { AbstractWriteResponseHandler resultResponseHandler; - if (consistency_level.isDatacenterLocal()) + if (replicaLayout.consistencyLevel.isDatacenterLocal()) { // block for in this context will be localnodes block. - resultResponseHandler = new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime); } - else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) + else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - resultResponseHandler = new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterSyncWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime); } else { - resultResponseHandler = new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new WriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime); } //Check if tracking the ideal consistency level is configured @@ -168,16 +169,14 @@ public abstract class AbstractReplicationStrategy //If ideal and requested are the same just use this handler to track the ideal consistency level //This is also used so that the ideal consistency level handler when constructed knows it is the ideal //one for tracking purposes - if (idealConsistencyLevel == consistency_level) + if (idealConsistencyLevel == replicaLayout.consistencyLevel) { resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler); } else { //Construct a delegate response handler to use to track the ideal consistency level - AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(naturalEndpoints, - pendingEndpoints, - idealConsistencyLevel, + AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel), callback, writeType, queryStartNanoTime, @@ -202,7 +201,12 @@ public abstract class AbstractReplicationStrategy * * @return the replication factor */ - public abstract int getReplicationFactor(); + public abstract ReplicationFactor getReplicationFactor(); + + public boolean hasTransientReplicas() + { + return getReplicationFactor().hasTransientReplicas(); + } /* * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below. @@ -210,53 +214,81 @@ public abstract class AbstractReplicationStrategy * (fixing this would probably require merging tokenmetadata into replicationstrategy, * so we could cache/invalidate cleanly.) */ - public Multimap> getAddressRanges(TokenMetadata metadata) + public RangesByEndpoint getAddressReplicas(TokenMetadata metadata) { - Multimap> map = HashMultimap.create(); + RangesByEndpoint.Mutable map = new RangesByEndpoint.Mutable(); for (Token token : metadata.sortedTokens()) { Range range = metadata.getPrimaryRangeFor(token); - for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) + for (Replica replica : calculateNaturalReplicas(token, metadata)) { - map.put(ep, range); + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + map.put(replica.endpoint(), replica); } } - return map; + return map.asImmutableView(); } - public Multimap, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata) + public RangesAtEndpoint getAddressReplicas(TokenMetadata metadata, InetAddressAndPort endpoint) { - Multimap, InetAddressAndPort> map = HashMultimap.create(); + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); + for (Token token : metadata.sortedTokens()) + { + Range range = metadata.getPrimaryRangeFor(token); + Replica replica = calculateNaturalReplicas(token, metadata) + .byEndpoint().get(endpoint); + if (replica != null) + { + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + builder.add(replica, Conflict.DUPLICATE); + } + } + return builder.build(); + } + + + public EndpointsByRange getRangeAddresses(TokenMetadata metadata) + { + EndpointsByRange.Mutable map = new EndpointsByRange.Mutable(); for (Token token : metadata.sortedTokens()) { Range range = metadata.getPrimaryRangeFor(token); - for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) + for (Replica replica : calculateNaturalReplicas(token, metadata)) { - map.put(range, ep); + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + map.put(range, replica); } } - return map; + return map.asImmutableView(); } - public Multimap> getAddressRanges() + public RangesByEndpoint getAddressReplicas() { - return getAddressRanges(tokenMetadata.cloneOnlyTokenMap()); + return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap()); } - public Collection> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress) + public RangesAtEndpoint getAddressReplicas(InetAddressAndPort endpoint) { - return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress); + return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap(), endpoint); } - public Collection> getPendingAddressRanges(TokenMetadata metadata, Collection pendingTokens, InetAddressAndPort pendingAddress) + public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress) + { + return getPendingAddressRanges(metadata, Collections.singleton(pendingToken), pendingAddress); + } + + public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Collection pendingTokens, InetAddressAndPort pendingAddress) { TokenMetadata temp = metadata.cloneOnlyTokenMap(); temp.updateNormalTokens(pendingTokens, pendingAddress); - return getAddressRanges(temp).get(pendingAddress); + return getAddressReplicas(temp, pendingAddress); } public abstract void validateOptions() throws ConfigurationException; @@ -329,6 +361,10 @@ public abstract class AbstractReplicationStrategy AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions); strategy.validateExpectedOptions(); strategy.validateOptions(); + if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled()) + { + throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use."); + } } public static Class getClass(String cls) throws ConfigurationException @@ -344,21 +380,23 @@ public abstract class AbstractReplicationStrategy public boolean hasSameSettings(AbstractReplicationStrategy other) { - return getClass().equals(other.getClass()) && getReplicationFactor() == other.getReplicationFactor(); + return getClass().equals(other.getClass()) && getReplicationFactor().equals(other.getReplicationFactor()); } - protected void validateReplicationFactor(String rf) throws ConfigurationException + protected void validateReplicationFactor(String s) throws ConfigurationException { try { - if (Integer.parseInt(rf) < 0) + ReplicationFactor rf = ReplicationFactor.fromString(s); + if (rf.hasTransientReplicas()) { - throw new ConfigurationException("Replication factor must be non-negative; found " + rf); + if (DatabaseDescriptor.getNumTokens() > 1) + throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet")); } } - catch (NumberFormatException e2) + catch (IllegalArgumentException e) { - throw new ConfigurationException("Replication factor must be numeric; found " + rf); + throw new ConfigurationException(e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 010c892..d35f1fb 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -42,7 +42,6 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; - /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector */ @@ -185,55 +184,38 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return subsnitch.getDatacenter(endpoint); } - public List getSortedListByProximity(final InetAddressAndPort address, Collection addresses) - { - List list = new ArrayList<>(addresses); - sortByProximity(address, list); - return list; - } - @Override - public void sortByProximity(final InetAddressAndPort address, List addresses) + public > C sortedByProximity(final InetAddressAndPort address, C unsortedAddresses) { assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself - if (dynamicBadnessThreshold == 0) - { - sortByProximityWithScore(address, addresses); - } - else - { - sortByProximityWithBadness(address, addresses); - } + return dynamicBadnessThreshold == 0 + ? sortedByProximityWithScore(address, unsortedAddresses) + : sortedByProximityWithBadness(address, unsortedAddresses); } - private void sortByProximityWithScore(final InetAddressAndPort address, List addresses) + private > C sortedByProximityWithScore(final InetAddressAndPort address, C unsortedAddresses) { // Scores can change concurrently from a call to this method. But Collections.sort() expects // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current // version of it during this call. final HashMap scores = this.scores; - Collections.sort(addresses, new Comparator() - { - public int compare(InetAddressAndPort a1, InetAddressAndPort a2) - { - return compareEndpoints(address, a1, a2, scores); - } - }); + return unsortedAddresses.sorted((r1, r2) -> compareEndpoints(address, r1, r2, scores)); } - private void sortByProximityWithBadness(final InetAddressAndPort address, List addresses) + private > C sortedByProximityWithBadness(final InetAddressAndPort address, C replicas) { - if (addresses.size() < 2) - return; + if (replicas.size() < 2) + return replicas; - subsnitch.sortByProximity(address, addresses); + // TODO: avoid copy + replicas = subsnitch.sortedByProximity(address, replicas); HashMap scores = this.scores; // Make sure the score don't change in the middle of the loop below // (which wouldn't really matter here but its cleaner that way). - ArrayList subsnitchOrderedScores = new ArrayList<>(addresses.size()); - for (InetAddressAndPort inet : addresses) + ArrayList subsnitchOrderedScores = new ArrayList<>(replicas.size()); + for (Replica replica : replicas) { - Double score = scores.get(inet); + Double score = scores.get(replica.endpoint()); if (score == null) score = 0.0; subsnitchOrderedScores.add(score); @@ -250,17 +232,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold))) { - sortByProximityWithScore(address, addresses); - return; + return sortedByProximityWithScore(address, replicas); } } + + return replicas; } // Compare endpoints given an immutable snapshot of the scores - private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map scores) + private int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2, Map scores) { - Double scored1 = scores.get(a1); - Double scored2 = scores.get(a2); + Double scored1 = scores.get(a1.endpoint()); + Double scored2 = scores.get(a2.endpoint()); if (scored1 == null) { @@ -280,7 +263,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return 1; } - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { // That function is fundamentally unsafe because the scores can change at any time and so the result of that // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in @@ -414,7 +397,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return getSeverity(FBUtilities.getBroadcastAddressAndPort()); } - public boolean isWorthMergingForRangeQuery(List merged, List l1, List l2) + public boolean isWorthMergingForRangeQuery(ReplicaCollection merged, ReplicaCollection l1, ReplicaCollection l2) { if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2)) return false; @@ -434,12 +417,12 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score. - private double maxScore(List endpoints) + private double maxScore(ReplicaCollection endpoints) { double maxScore = -1.0; - for (InetAddressAndPort endpoint : endpoints) + for (Replica replica : endpoints) { - Double score = scores.get(endpoint); + Double score = scores.get(replica.endpoint()); if (score == null) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Ec2Snitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java index b6aafd3..d0474e4 100644 --- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java @@ -68,7 +68,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch { String az = awsApiCall(ZONE_NAME_QUERY_URL); - // if using the full naming scheme, region name is created by removing letters from the + // if using the full naming scheme, region name is created by removing letters from the // end of the availability zone and zone is the full zone name usingLegacyNaming = isUsingLegacyNaming(props); String region; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Endpoints.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java new file mode 100644 index 0000000..3d5faa4 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/Endpoints.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.utils.FBUtilities; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public abstract class Endpoints> extends AbstractReplicaCollection +{ + static final Map EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>()); + + volatile Map byEndpoint; + + Endpoints(List list, boolean isSnapshot, Map byEndpoint) + { + super(list, isSnapshot); + this.byEndpoint = byEndpoint; + } + + @Override + public Set endpoints() + { + return byEndpoint().keySet(); + } + + public Map byEndpoint() + { + Map map = byEndpoint; + if (map == null) + byEndpoint = map = buildByEndpoint(list); + return map; + } + + public boolean contains(InetAddressAndPort endpoint, boolean isFull) + { + Replica replica = byEndpoint().get(endpoint); + return replica != null && replica.isFull() == isFull; + } + + @Override + public boolean contains(Replica replica) + { + return replica != null + && Objects.equals( + byEndpoint().get(replica.endpoint()), + replica); + } + + private static Map buildByEndpoint(List list) + { + // TODO: implement a delegating map that uses our superclass' list, and is immutable + Map byEndpoint = new LinkedHashMap<>(list.size()); + for (Replica replica : list) + { + Replica prev = byEndpoint.put(replica.endpoint(), replica); + assert prev == null : "duplicate endpoint in EndpointsForRange: " + prev + " and " + replica; + } + + return Collections.unmodifiableMap(byEndpoint); + } + + public E withoutSelf() + { + InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); + return filter(r -> !self.equals(r.endpoint())); + } + + public E without(Set remove) + { + return filter(r -> !remove.contains(r.endpoint())); + } + + public E keep(Set keep) + { + return filter(r -> keep.contains(r.endpoint())); + } + + public E keep(Iterable endpoints) + { + ReplicaCollection.Mutable copy = newMutable( + endpoints instanceof Collection + ? ((Collection) endpoints).size() + : size() + ); + Map byEndpoint = byEndpoint(); + for (InetAddressAndPort endpoint : endpoints) + { + Replica keep = byEndpoint.get(endpoint); + if (keep == null) + continue; + copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE); + } + return copy.asSnapshot(); + } + + /** + * Care must be taken to ensure no conflicting ranges occur in pending and natural. + * Conflicts can occur for two reasons: + * 1) due to lack of isolation when reading pending/natural + * 2) because a movement that changes the type of replication from transient to full must be handled + * differently for reads and writes (with the reader treating it as transient, and writer as full) + * + * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues + */ + public static > E concat(E natural, E pending) + { + return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE); + } + + public static > boolean haveConflicts(E natural, E pending) + { + Set naturalEndpoints = natural.endpoints(); + for (InetAddressAndPort pendingEndpoint : pending.endpoints()) + { + if (naturalEndpoints.contains(pendingEndpoint)) + return true; + } + return false; + } + + // must apply first + public static > E resolveConflictsInNatural(E natural, E pending) + { + return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true)); + } + + // must apply second + public static > E resolveConflictsInPending(E natural, E pending) + { + return pending.without(natural.endpoints()); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsByRange.java b/src/java/org/apache/cassandra/locator/EndpointsByRange.java new file mode 100644 index 0000000..cdc8a68 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsByRange.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; + +import java.util.Collections; +import java.util.Map; + +public class EndpointsByRange extends ReplicaMultimap, EndpointsForRange> +{ + public EndpointsByRange(Map, EndpointsForRange> map) + { + super(map); + } + + public EndpointsForRange get(Range range) + { + Preconditions.checkNotNull(range); + return map.getOrDefault(range, EndpointsForRange.empty(range)); + } + + public static class Mutable extends ReplicaMultimap.Mutable, EndpointsForRange.Mutable> + { + @Override + protected EndpointsForRange.Mutable newMutable(Range range) + { + return new EndpointsForRange.Mutable(range); + } + + // TODO: consider all ignoreDuplicates cases + public void putAll(Range range, EndpointsForRange replicas, Conflict ignoreConflicts) + { + get(range).addAll(replicas, ignoreConflicts); + } + + public EndpointsByRange asImmutableView() + { + return new EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView))); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByReplica.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsByReplica.java b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java new file mode 100644 index 0000000..ceea2d1 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; + +import java.util.Collections; +import java.util.Map; + +public class EndpointsByReplica extends ReplicaMultimap +{ + public EndpointsByReplica(Map map) + { + super(map); + } + + public EndpointsForRange get(Replica range) + { + Preconditions.checkNotNull(range); + return map.getOrDefault(range, EndpointsForRange.empty(range.range())); + } + + public static class Mutable extends ReplicaMultimap.Mutable + { + @Override + protected EndpointsForRange.Mutable newMutable(Replica replica) + { + return new EndpointsForRange.Mutable(replica.range()); + } + + // TODO: consider all ignoreDuplicates cases + public void putAll(Replica range, EndpointsForRange replicas, Conflict ignoreConflicts) + { + map.computeIfAbsent(range, r -> newMutable(r)).addAll(replicas, ignoreConflicts); + } + + public EndpointsByReplica asImmutableView() + { + return new EndpointsByReplica(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView))); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java new file mode 100644 index 0000000..c2d8232 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Iterables.all; + +/** + * A ReplicaCollection where all Replica are required to cover a range that fully contains the range() defined in the builder(). + * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case + * only the first occurrence makes the cut). + */ +public class EndpointsForRange extends Endpoints +{ + private final Range range; + private EndpointsForRange(Range range, List list, boolean isSnapshot) + { + this(range, list, isSnapshot, null); + } + private EndpointsForRange(Range range, List list, boolean isSnapshot, Map byEndpoint) + { + super(list, isSnapshot, byEndpoint); + this.range = range; + assert range != null; + } + + public Range range() + { + return range; + } + + @Override + public Mutable newMutable(int initialCapacity) + { + return new Mutable(range, initialCapacity); + } + + public EndpointsForToken forToken(Token token) + { + if (!range.contains(token)) + throw new IllegalArgumentException(token + " is not contained within " + range); + return new EndpointsForToken(token, list, isSnapshot, byEndpoint); + } + + @Override + public EndpointsForRange self() + { + return this; + } + + @Override + protected EndpointsForRange snapshot(List snapshot) + { + if (snapshot.isEmpty()) return empty(range); + return new EndpointsForRange(range, snapshot, true); + } + + public static class Mutable extends EndpointsForRange implements ReplicaCollection.Mutable + { + boolean hasSnapshot; + public Mutable(Range range) { this(range, 0); } + public Mutable(Range range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + + public void add(Replica replica, Conflict ignoreConflict) + { + if (hasSnapshot) throw new IllegalStateException(); + Preconditions.checkNotNull(replica); + if (!replica.range().contains(super.range)) + throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.range); + + Replica prev = super.byEndpoint.put(replica.endpoint(), replica); + if (prev != null) + { + super.byEndpoint.put(replica.endpoint(), prev); // restore prev + switch (ignoreConflict) + { + case DUPLICATE: + if (prev.equals(replica)) + break; + case NONE: + throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev); + case ALL: + } + return; + } + + list.add(replica); + } + + @Override + public Map byEndpoint() + { + // our internal map is modifiable, but it is unsafe to modify the map externally + // it would be possible to implement a safe modifiable map, but it is probably not valuable + return Collections.unmodifiableMap(super.byEndpoint()); + } + + private EndpointsForRange get(boolean isSnapshot) + { + return new EndpointsForRange(super.range, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint)); + } + + public EndpointsForRange asImmutableView() + { + return get(false); + } + + public EndpointsForRange asSnapshot() + { + hasSnapshot = true; + return get(true); + } + } + + public static class Builder extends ReplicaCollection.Builder + { + public Builder(Range range) { this(range, 0); } + public Builder(Range range, int capacity) { super (new Mutable(range, capacity)); } + public boolean containsEndpoint(InetAddressAndPort endpoint) + { + return mutable.asImmutableView().byEndpoint.containsKey(endpoint); + } + } + + public static Builder builder(Range range) + { + return new Builder(range); + } + public static Builder builder(Range range, int capacity) + { + return new Builder(range, capacity); + } + + public static EndpointsForRange empty(Range range) + { + return new EndpointsForRange(range, EMPTY_LIST, true, EMPTY_MAP); + } + + public static EndpointsForRange of(Replica replica) + { + // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic + ArrayList one = new ArrayList<>(1); + one.add(replica); + // we can safely use singletonMap, as we only otherwise use LinkedHashMap + return new EndpointsForRange(replica.range(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica))); + } + + public static EndpointsForRange of(Replica ... replicas) + { + return copyOf(Arrays.asList(replicas)); + } + + public static EndpointsForRange copyOf(Collection replicas) + { + if (replicas.isEmpty()) + throw new IllegalArgumentException("Collection must be non-empty to copy"); + Range range = replicas.iterator().next().range(); + assert all(replicas, r -> range.equals(r.range())); + return builder(range, replicas.size()).addAll(replicas).build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForToken.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java new file mode 100644 index 0000000..f24c615 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import org.apache.cassandra.dht.Token; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * A ReplicaCollection where all Replica are required to cover a range that fully contains the token() defined in the builder(). + * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case + * only the first occurrence makes the cut). + */ +public class EndpointsForToken extends Endpoints +{ + private final Token token; + private EndpointsForToken(Token token, List list, boolean isSnapshot) + { + this(token, list, isSnapshot, null); + } + + EndpointsForToken(Token token, List list, boolean isSnapshot, Map byEndpoint) + { + super(list, isSnapshot, byEndpoint); + this.token = token; + assert token != null; + } + + public Token token() + { + return token; + } + + @Override + public Mutable newMutable(int initialCapacity) + { + return new Mutable(token, initialCapacity); + } + + @Override + public EndpointsForToken self() + { + return this; + } + + @Override + protected EndpointsForToken snapshot(List subList) + { + if (subList.isEmpty()) return empty(token); + return new EndpointsForToken(token, subList, true); + } + + public static class Mutable extends EndpointsForToken implements ReplicaCollection.Mutable + { + boolean hasSnapshot; + public Mutable(Token token) { this(token, 0); } + public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + + public void add(Replica replica, Conflict ignoreConflict) + { + if (hasSnapshot) throw new IllegalStateException(); + Preconditions.checkNotNull(replica); + if (!replica.range().contains(super.token)) + throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.token); + + Replica prev = super.byEndpoint.put(replica.endpoint(), replica); + if (prev != null) + { + super.byEndpoint.put(replica.endpoint(), prev); // restore prev + switch (ignoreConflict) + { + case DUPLICATE: + if (prev.equals(replica)) + break; + case NONE: + throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev); + case ALL: + } + return; + } + + list.add(replica); + } + + @Override + public Map byEndpoint() + { + // our internal map is modifiable, but it is unsafe to modify the map externally + // it would be possible to implement a safe modifiable map, but it is probably not valuable + return Collections.unmodifiableMap(super.byEndpoint()); + } + + private EndpointsForToken get(boolean isSnapshot) + { + return new EndpointsForToken(super.token, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint)); + } + + public EndpointsForToken asImmutableView() + { + return get(false); + } + + public EndpointsForToken asSnapshot() + { + hasSnapshot = true; + return get(true); + } + } + + public static class Builder extends ReplicaCollection.Builder + { + public Builder(Token token) { this(token, 0); } + public Builder(Token token, int capacity) { super (new Mutable(token, capacity)); } + } + + public static Builder builder(Token token) + { + return new Builder(token); + } + public static Builder builder(Token token, int capacity) + { + return new Builder(token, capacity); + } + + public static EndpointsForToken empty(Token token) + { + return new EndpointsForToken(token, EMPTY_LIST, true, EMPTY_MAP); + } + + public static EndpointsForToken of(Token token, Replica replica) + { + // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic + ArrayList one = new ArrayList<>(1); + one.add(replica); + // we can safely use singletonMap, as we only otherwise use LinkedHashMap + return new EndpointsForToken(token, one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica))); + } + + public static EndpointsForToken of(Token token, Replica ... replicas) + { + return copyOf(token, Arrays.asList(replicas)); + } + + public static EndpointsForToken copyOf(Token token, Collection replicas) + { + if (replicas.isEmpty()) return empty(token); + return builder(token, replicas.size()).addAll(replicas).build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/IEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java index 63d333b..b7797b0 100644 --- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.List; import java.util.Set; /** @@ -39,20 +37,20 @@ public interface IEndpointSnitch */ public String getDatacenter(InetAddressAndPort endpoint); - /** - * returns a new List sorted by proximity to the given endpoint - */ - public List getSortedListByProximity(InetAddressAndPort address, Collection unsortedAddress); + default public String getDatacenter(Replica replica) + { + return getDatacenter(replica.endpoint()); + } /** - * This method will sort the List by proximity to the given address. + * returns a new List sorted by proximity to the given endpoint */ - public void sortByProximity(InetAddressAndPort address, List addresses); + public > C sortedByProximity(final InetAddressAndPort address, C addresses); /** * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would */ - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2); /** * called after Gossiper instance exists immediately before it starts gossiping @@ -63,7 +61,7 @@ public interface IEndpointSnitch * Returns whether for a range query doing a query against merged is likely * to be faster than 2 sequential queries, one against l1 followed by one against l2. */ - public boolean isWorthMergingForRangeQuery(List merged, List l1, List l2); + public boolean isWorthMergingForRangeQuery(ReplicaCollection merged, ReplicaCollection l1, ReplicaCollection l2); /** * Determine if the datacenter or rack values in the current node's snitch conflict with those passed in parameters. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java index 38a1a49..a47c72a 100644 --- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java +++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java @@ -25,6 +25,7 @@ import java.net.UnknownHostException; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FastByteOperations; /** @@ -191,9 +192,9 @@ public final class InetAddressAndPort implements Comparable, return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress()); } - public static InetAddressAndPort getLocalHost() throws UnknownHostException + public static InetAddressAndPort getLocalHost() { - return InetAddressAndPort.getByAddress(InetAddress.getLocalHost()); + return FBUtilities.getLocalAddressAndPort(); } public static void initializeDefaultPort(int port) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/LocalStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java index a76fe96..41cc9b0 100644 --- a/src/java/org/apache/cassandra/locator/LocalStrategy.java +++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java @@ -17,12 +17,11 @@ */ package org.apache.cassandra.locator; -import java.util.ArrayList; import java.util.Collections; import java.util.Collection; -import java.util.List; import java.util.Map; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.RingPosition; import org.apache.cassandra.dht.Token; @@ -30,32 +29,40 @@ import org.apache.cassandra.utils.FBUtilities; public class LocalStrategy extends AbstractReplicationStrategy { + private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1); + private final EndpointsForRange replicas; + public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map configOptions) { super(keyspaceName, tokenMetadata, snitch, configOptions); + replicas = EndpointsForRange.of( + new Replica(FBUtilities.getBroadcastAddressAndPort(), + DatabaseDescriptor.getPartitioner().getMinimumToken(), + DatabaseDescriptor.getPartitioner().getMinimumToken(), + true + ) + ); } /** - * We need to override this even if we override calculateNaturalEndpoints, + * We need to override this even if we override calculateNaturalReplicas, * because the default implementation depends on token calculations but * LocalStrategy may be used before tokens are set up. */ @Override - public ArrayList getNaturalEndpoints(RingPosition searchPosition) + public EndpointsForRange getNaturalReplicas(RingPosition searchPosition) { - ArrayList l = new ArrayList(1); - l.add(FBUtilities.getBroadcastAddressAndPort()); - return l; + return replicas; } - public List calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort()); + return replicas; } - public int getReplicationFactor() + public ReplicationFactor getReplicationFactor() { - return 1; + return RF; } public void validateOptions() throws ConfigurationException --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org