cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [7/9] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Wed, 18 Apr 2018 10:39:06 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index 4c7a6c9,0000000..ebf6a6f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,231 -1,0 +1,214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.*;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.filter.*;
++import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
++import org.apache.cassandra.db.transform.*;
 +import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.net.*;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
- import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.filter.*;
- import org.apache.cassandra.db.partitions.*;
- import org.apache.cassandra.db.transform.*;
- import org.apache.cassandra.net.*;
- import org.apache.cassandra.tracing.TraceState;
 +
 +public class DataResolver extends ResponseResolver
 +{
 +    private final long queryStartNanoTime;
 +    private final boolean enforceStrictLiveness;
 +
 +    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime, ReadRepair readRepair)
 +    {
 +        super(keyspace, command, consistency, readRepair, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
 +        this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.iterator().next().payload;
 +        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once and use that through the method.
 +        int count = responses.size();
 +        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
 +        InetAddressAndPort[] sources = new InetAddressAndPort[count];
 +        for (int i = 0; i < count; i++)
 +        {
 +            MessageIn<ReadResponse> msg = responses.get(i);
 +            iters.add(msg.payload.makeIterator(command));
 +            sources[i] = msg.from;
 +        }
 +
 +        /*
 +         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
 +         * have more rows than the client requested. To make sure that we still conform to the original limit,
 +         * we apply a top-level post-reconciliation counter to the merged partition iterator.
 +         *
 +         * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied
 +         * to the current partition to work. For this reason we have to apply the counter transformation before
 +         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
 +         *
 +         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
 +         *
 +         * See CASSANDRA-13747 for more details.
 +         */
 +
 +        DataLimits.Counter mergedResultCounter =
-         command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
++            command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
 +
 +        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter);
 +        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
 +        PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter);
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +    }
 +
 +    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
 +                                                                     InetAddressAndPort[] sources,
 +                                                                     DataLimits.Counter mergedResultCounter)
 +    {
 +        // If we have only one results, there is no read repair to do and we can't get short reads
 +        if (results.size() == 1)
 +            return results.get(0);
 +
 +        /*
 +         * So-called short reads stems from nodes returning only a subset of the results they have due to the limit,
 +         * but that subset not being enough post-reconciliation. So if we don't have a limit, don't bother.
 +         */
 +        if (!command.limits().isUnlimited())
 +            for (int i = 0; i < results.size(); i++)
-             {
 +                results.set(i, ShortReadProtection.extend(sources[i], results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
-             }
 +
 +        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), wrapMergeListener(readRepair.getMergeListener(sources), sources));
 +    }
 +
-     public void evaluateAllResponses()
-     {
-         // We need to fully consume the results to trigger read repairs if appropriate
-         try (PartitionIterator iterator = resolve())
-         {
-             PartitionIterators.consume(iterator);
-         }
-     }
- 
-     public void evaluateAllResponses(TraceState traceState)
-     {
-         evaluateAllResponses();
-     }
- 
 +    private String makeResponsesDebugString(DecoratedKey partitionKey)
 +    {
 +        return Joiner.on(",\n").join(Iterables.transform(getMessages(), m -> m.from + " => " + m.payload.toDebugString(command, partitionKey)));
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, InetAddressAndPort[] sources)
 +    {
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +            {
 +                UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           mergedDeletion == null ? "null" : mergedDeletion.toString(),
 +                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
 +                                                           Arrays.toString(sources),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        try
 +                        {
 +                            rowListener.onMergedRows(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           Arrays.toString(sources),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
 +                    {
 +                        try
 +                        {
 +                            // The code for merging range tombstones is a tad complex and we had the assertions there triggered
 +                            // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
 +                            // when that happen without more context that what the assertion errors give us however, hence the
 +                            // catch here that basically gather as much as context as reasonable.
 +                            rowListener.onMergedRangeTombstoneMarkers(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +
 +                            // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? "null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", ").join(Iterables.transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
 +                                                           Arrays.toString(sources),
 +                                                           makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +
 +                    }
 +
 +                    public void close()
 +                    {
 +                        rowListener.close();
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +                partitionListener.close();
 +            }
 +        };
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 828a65e,0000000..b2eb0c6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@@ -1,93 -1,0 +1,85 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.nio.ByteBuffer;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.tracing.TraceState;
 +
 +public class DigestResolver extends ResponseResolver
 +{
 +    private volatile ReadResponse dataResponse;
 +
 +    public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
 +    {
 +        super(keyspace, command, consistency, readRepair, maxResponseCount);
 +        Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
 +                                    "DigestResolver can only be used with SinglePartitionReadCommand commands");
 +    }
 +
 +    @Override
 +    public void preprocess(MessageIn<ReadResponse> message)
 +    {
 +        super.preprocess(message);
 +        if (dataResponse == null && !message.payload.isDigestResponse())
 +            dataResponse = message.payload;
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        assert isDataPresent();
 +        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public boolean responsesMatch()
 +    {
 +        long start = System.nanoTime();
 +
 +        // validate digests against each other; return false immediately on mismatch.
 +        ByteBuffer digest = null;
 +        for (MessageIn<ReadResponse> message : responses)
 +        {
 +            ReadResponse response = message.payload;
 +
 +            ByteBuffer newDigest = response.digest(command);
 +            if (digest == null)
 +                digest = newDigest;
 +            else if (!digest.equals(newDigest))
 +                // rely on the fact that only single partition queries use digests
 +                return false;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("responsesMatch: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        return true;
 +    }
 +
-     public void evaluateAllResponses(TraceState traceState)
-     {
-         if (!responsesMatch())
-         {
-             readRepair.backgroundDigestRepair(traceState);
-         }
-     }
- 
 +    public boolean isDataPresent()
 +    {
 +        return dataResponse != null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 62fdfaa,0000000..a35fc2e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@@ -1,213 -1,0 +1,206 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.exceptions.ReadFailureException;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.ReadRepairMetrics;
 +import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.SimpleCondition;
 +
 +public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 +{
 +    protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 +
 +    public final ResponseResolver resolver;
 +    final SimpleCondition condition = new SimpleCondition();
 +    private final long queryStartNanoTime;
 +    final int blockfor;
 +    final List<InetAddressAndPort> endpoints;
 +    private final ReadCommand command;
 +    private final ConsistencyLevel consistencyLevel;
 +    private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
 +            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
 +    private volatile int received = 0;
 +    private static final AtomicIntegerFieldUpdater<ReadCallback> failuresUpdater
 +            = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures");
 +    private volatile int failures = 0;
 +    private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
 +
 +    private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 +
 +    private final ReadRepair readRepair;
 +
 +    /**
 +     * Constructor when response count has to be calculated and blocked for.
 +     */
 +    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddressAndPort> filteredEndpoints, long queryStartNanoTime, ReadRepair readRepair)
 +    {
 +        this(resolver,
 +             consistencyLevel,
 +             consistencyLevel.blockFor(Keyspace.open(command.metadata().keyspace)),
 +             command,
 +             Keyspace.open(command.metadata().keyspace),
 +             filteredEndpoints,
 +             queryStartNanoTime, readRepair);
 +    }
 +
 +    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ReadRepair readRepair)
 +    {
 +        this.command = command;
 +        this.keyspace = keyspace;
 +        this.blockfor = blockfor;
 +        this.consistencyLevel = consistencyLevel;
 +        this.resolver = resolver;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +        this.endpoints = endpoints;
 +        this.readRepair = readRepair;
 +        this.failureReasonByEndpoint = new ConcurrentHashMap<>();
 +        // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
 +        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Blockfor is {}; setting up requests to {}", blockfor, StringUtils.join(this.endpoints, ","));
 +    }
 +
 +    public boolean await(long timePastStart, TimeUnit unit)
 +    {
 +        long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
 +        try
 +        {
 +            return condition.await(time, TimeUnit.NANOSECONDS);
 +        }
 +        catch (InterruptedException ex)
 +        {
 +            throw new AssertionError(ex);
 +        }
 +    }
 +
 +    public void awaitResults() throws ReadFailureException, ReadTimeoutException
 +    {
 +        boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
 +        boolean failed = blockfor + failures > endpoints.size();
 +        if (signaled && !failed)
 +            return;
 +
 +        if (Tracing.isTracing())
 +        {
 +            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
 +            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
 +        }
 +        else if (logger.isDebugEnabled())
 +        {
 +            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
 +            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
 +        }
 +
 +        // Same as for writes, see AbstractWriteResponseHandler
 +        throw failed
 +            ? new ReadFailureException(consistencyLevel, received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
 +            : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
 +    }
 +
 +    public int blockFor()
 +    {
 +        return blockfor;
 +    }
 +
 +    public void response(MessageIn<ReadResponse> message)
 +    {
 +        resolver.preprocess(message);
 +        int n = waitingFor(message.from)
 +              ? recievedUpdater.incrementAndGet(this)
 +              : received;
++
 +        if (n >= blockfor && resolver.isDataPresent())
-         {
 +            condition.signalAll();
-             // kick off a background digest comparison if this is a result that (may have) arrived after
-             // the original resolve that get() kicks off as soon as the condition is signaled
-             if (blockfor < endpoints.size() && n == endpoints.size())
-             {
-                 readRepair.maybeStartBackgroundRepair(resolver);
-             }
-         }
 +    }
 +
 +    /**
 +     * @return true if the message counts towards the blockfor threshold
 +     */
 +    private boolean waitingFor(InetAddressAndPort from)
 +    {
 +        return consistencyLevel.isDatacenterLocal()
 +             ? DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from))
 +             : true;
 +    }
 +
 +    /**
 +     * @return the current number of received responses
 +     */
 +    public int getReceivedCount()
 +    {
 +        return received;
 +    }
 +
 +    public void response(ReadResponse result)
 +    {
 +        MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
 +                                                           result,
 +                                                           Collections.emptyMap(),
 +                                                           MessagingService.Verb.INTERNAL_RESPONSE,
 +                                                           MessagingService.current_version);
 +        response(message);
 +    }
 +
 +    public void assureSufficientLiveNodes() throws UnavailableException
 +    {
 +        consistencyLevel.assureSufficientLiveNodes(keyspace, endpoints);
 +    }
 +
 +    public boolean isLatencyForSnitch()
 +    {
 +        return true;
 +    }
 +
 +    @Override
 +    public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
 +    {
 +        int n = waitingFor(from)
 +              ? failuresUpdater.incrementAndGet(this)
 +              : failures;
 +
 +        failureReasonByEndpoint.put(from, failureReason);
 +
 +        if (blockfor + n > endpoints.size())
 +            condition.signalAll();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 69ec063,0000000..f4f00a2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@@ -1,66 -1,0 +1,60 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
- import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.utils.concurrent.Accumulator;
 +
 +public abstract class ResponseResolver
 +{
 +    protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
 +
 +    protected final Keyspace keyspace;
 +    protected final ReadCommand command;
 +    protected final ConsistencyLevel consistency;
 +    protected final ReadRepair readRepair;
 +
 +    // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
 +    protected final Accumulator<MessageIn<ReadResponse>> responses;
 +
 +    public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair, int maxResponseCount)
 +    {
 +        this.keyspace = keyspace;
 +        this.command = command;
 +        this.consistency = consistency;
 +        this.readRepair = readRepair;
 +        this.responses = new Accumulator<>(maxResponseCount);
 +    }
 +
-     /**
-      * Consume the accumulated responses, starting a read repair if neccesary
-      */
-     public abstract void evaluateAllResponses(TraceState traceState);
- 
 +    public abstract boolean isDataPresent();
 +
 +    public void preprocess(MessageIn<ReadResponse> message)
 +    {
 +        responses.add(message);
 +    }
 +
 +    public Accumulator<MessageIn<ReadResponse>> getMessages()
 +    {
 +        return responses;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 8689356,0000000..f207b7d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@@ -1,280 -1,0 +1,250 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Queue;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.function.Consumer;
 +
 +import javax.annotation.Nullable;
 +
 +import com.google.common.util.concurrent.AbstractFuture;
 +import com.google.common.util.concurrent.FutureCallback;
 +import com.google.common.util.concurrent.Futures;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import org.apache.cassandra.concurrent.Stage;
- import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.ReadRepairMetrics;
 +import org.apache.cassandra.net.AsyncOneResponse;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.service.reads.AsyncRepairCallback;
 +import org.apache.cassandra.service.reads.DataResolver;
 +import org.apache.cassandra.service.reads.DigestResolver;
 +import org.apache.cassandra.service.reads.ReadCallback;
- import org.apache.cassandra.service.reads.ResponseResolver;
- import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +/**
 + * 'Classic' read repair. Doesn't allow the client read to return until
 + *  updates have been written to nodes needing correction.
 + */
 +public class BlockingReadRepair implements ReadRepair, RepairListener
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
 +
 +    private static final boolean DROP_OVERSIZED_READ_REPAIR_MUTATIONS =
 +        Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
 +
 +    private final ReadCommand command;
 +    private final List<InetAddressAndPort> endpoints;
 +    private final long queryStartNanoTime;
 +    private final ConsistencyLevel consistency;
 +
 +    private final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
 +
 +    private volatile DigestRepair digestRepair = null;
 +
 +    private static class DigestRepair
 +    {
 +        private final DataResolver dataResolver;
 +        private final ReadCallback readCallback;
 +        private final Consumer<PartitionIterator> resultConsumer;
 +
 +        public DigestRepair(DataResolver dataResolver, ReadCallback readCallback, Consumer<PartitionIterator> resultConsumer)
 +        {
 +            this.dataResolver = dataResolver;
 +            this.readCallback = readCallback;
 +            this.resultConsumer = resultConsumer;
 +        }
 +    }
 +
 +    public BlockingReadRepair(ReadCommand command,
 +                              List<InetAddressAndPort> endpoints,
 +                              long queryStartNanoTime,
 +                              ConsistencyLevel consistency)
 +    {
 +        this.command = command;
 +        this.endpoints = endpoints;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +        this.consistency = consistency;
 +    }
 +
 +    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
 +    {
 +        return new PartitionIteratorMergeListener(endpoints, command, this);
 +    }
 +
 +    public static class BlockingPartitionRepair extends AbstractFuture<Object> implements RepairListener.PartitionRepair
 +    {
 +
 +        final List<AsyncOneResponse<?>> responses;
 +        final ReadCommand command;
 +        final ConsistencyLevel consistency;
 +
 +        public BlockingPartitionRepair(int expectedResponses, ReadCommand command, ConsistencyLevel consistency)
 +        {
 +            this.responses = new ArrayList<>(expectedResponses);
 +            this.command = command;
 +            this.consistency = consistency;
 +        }
 +
 +        private AsyncOneResponse sendRepairMutation(Mutation mutation, InetAddressAndPort destination)
 +        {
 +            DecoratedKey key = mutation.key();
 +            Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +            int messagingVersion = MessagingService.instance().getVersion(destination);
 +
 +            int    mutationSize = (int) Mutation.serializer.serializedSize(mutation, messagingVersion);
 +            int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
 +
 +            AsyncOneResponse callback = null;
 +
 +            if (mutationSize <= maxMutationSize)
 +            {
 +                Tracing.trace("Sending read-repair-mutation to {}", destination);
 +                // use a separate verb here to avoid writing hints on timeouts
 +                MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
 +                callback = MessagingService.instance().sendRR(message, destination);
 +                ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
 +            }
 +            else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
 +            {
 +                logger.debug("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
 +                             mutationSize,
 +                             maxMutationSize,
 +                             command.metadata(),
 +                             command.metadata().partitionKeyType.getString(key.getKey()),
 +                             destination);
 +            }
 +            else
 +            {
 +                logger.warn("Encountered an oversized ({}/{}) read repair mutation for table {}, key {}, node {}",
 +                            mutationSize,
 +                            maxMutationSize,
 +                            command.metadata(),
 +                            command.metadata().partitionKeyType.getString(key.getKey()),
 +                            destination);
 +
 +                int blockFor = consistency.blockFor(keyspace);
 +                Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 +                throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
 +            }
 +            return callback;
 +        }
 +
 +        public void reportMutation(InetAddressAndPort endpoint, Mutation mutation)
 +        {
 +            AsyncOneResponse<?> response = sendRepairMutation(mutation, endpoint);
 +
 +            if (response != null)
 +                responses.add(response);
 +        }
 +
 +        public void finish()
 +        {
 +            Futures.addCallback(Futures.allAsList(responses), new FutureCallback<List<Object>>()
 +            {
 +                public void onSuccess(@Nullable List<Object> result)
 +                {
 +                    set(result);
 +                }
 +
 +                public void onFailure(Throwable t)
 +                {
 +                    setException(t);
 +                }
 +            });
 +        }
 +    }
 +
 +    public void awaitRepairs(long timeout)
 +    {
 +        try
 +        {
 +            Futures.allAsList(repairs).get(timeout, TimeUnit.MILLISECONDS);
 +        }
 +        catch (TimeoutException ex)
 +        {
 +            // We got all responses, but timed out while repairing
 +            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 +            int blockFor = consistency.blockFor(keyspace);
 +            if (Tracing.isTracing())
 +                Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 +            else
 +                logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 +
 +            throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true);
 +        }
 +        catch (InterruptedException | ExecutionException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    public PartitionRepair startPartitionRepair()
 +    {
 +        BlockingPartitionRepair repair = new BlockingPartitionRepair(endpoints.size(), command, consistency);
 +        repairs.add(repair);
 +        return repair;
 +    }
 +
-     public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
++    public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
 +    {
 +        ReadRepairMetrics.repairedBlocking.mark();
 +
 +        // Do a full data read to resolve the correct response (and repair node that need be)
 +        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 +        DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
 +        ReadCallback readCallback = new ReadCallback(resolver, ConsistencyLevel.ALL, contactedEndpoints.size(), command,
 +                                                     keyspace, allEndpoints, queryStartNanoTime, this);
 +
 +        digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
 +
 +        for (InetAddressAndPort endpoint : contactedEndpoints)
 +        {
 +            Tracing.trace("Enqueuing full data read to {}", endpoint);
 +            MessagingService.instance().sendRRWithFailure(command.createMessage(), endpoint, readCallback);
 +        }
 +    }
 +
-     public void awaitForegroundRepairFinish() throws ReadTimeoutException
++    public void awaitRepair() throws ReadTimeoutException
 +    {
 +        if (digestRepair != null)
 +        {
 +            digestRepair.readCallback.awaitResults();
 +            digestRepair.resultConsumer.accept(digestRepair.dataResolver.resolve());
 +        }
 +    }
- 
-     public void maybeStartBackgroundRepair(ResponseResolver resolver)
-     {
-         TraceState traceState = Tracing.instance.get();
-         if (traceState != null)
-             traceState.trace("Initiating read-repair");
-         StageManager.getStage(Stage.READ_REPAIR).execute(() -> resolver.evaluateAllResponses(traceState));
-     }
- 
-     public void backgroundDigestRepair(TraceState traceState)
-     {
-         if (traceState != null)
-             traceState.trace("Digest mismatch");
-         if (logger.isDebugEnabled())
-             logger.debug("Digest mismatch");
- 
-         ReadRepairMetrics.repairedBackground.mark();
- 
-         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-         final DataResolver repairResolver = new DataResolver(keyspace, command, consistency, endpoints.size(), queryStartNanoTime, this);
-         AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
- 
-         for (InetAddressAndPort endpoint : endpoints)
-             MessagingService.instance().sendRR(command.createMessage(), endpoint, repairHandler);
-     }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 39f5bff,0000000..4436f3a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@@ -1,62 -1,0 +1,49 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.util.List;
 +import java.util.function.Consumer;
 +
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.reads.DigestResolver;
- import org.apache.cassandra.service.reads.ResponseResolver;
- import org.apache.cassandra.tracing.TraceState;
 +
 +public class NoopReadRepair implements ReadRepair
 +{
 +    public static final NoopReadRepair instance = new NoopReadRepair();
 +
 +    private NoopReadRepair() {}
 +
 +    public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
 +    {
 +        return UnfilteredPartitionIterators.MergeListener.NOOP;
 +    }
 +
-     public void startForegroundRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
++    public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
 +    {
 +        resultConsumer.accept(digestResolver.getData());
 +    }
 +
-     public void awaitForegroundRepairFinish() throws ReadTimeoutException
++    public void awaitRepair() throws ReadTimeoutException
 +    {
- 
-     }
- 
-     public void maybeStartBackgroundRepair(ResponseResolver resolver)
-     {
- 
-     }
- 
-     public void backgroundDigestRepair(TraceState traceState)
-     {
- 
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 21cab20,0000000..289875d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@@ -1,73 -1,0 +1,56 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
- 
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.util.List;
 +import java.util.function.Consumer;
 +
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.reads.DigestResolver;
- import org.apache.cassandra.service.reads.ResponseResolver;
- import org.apache.cassandra.tracing.TraceState;
 +
 +public interface ReadRepair
 +{
- 
 +    /**
 +     * Used by DataResolver to generate corrections as the partition iterator is consumed
 +     */
 +    UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
 +
 +    /**
 +     * Called when the digests from the initial read don't match. Reads may block on the
 +     * repair started by this method.
 +     */
-     public void startForegroundRepair(DigestResolver digestResolver,
-                                       List<InetAddressAndPort> allEndpoints,
-                                       List<InetAddressAndPort> contactedEndpoints,
-                                       Consumer<PartitionIterator> resultConsumer);
- 
-     /**
-      * Wait for any operations started by {@link ReadRepair#startForegroundRepair} to complete
-      * @throws ReadTimeoutException
-      */
-     public void awaitForegroundRepairFinish() throws ReadTimeoutException;
- 
-     /**
-      * Called when responses from all replicas have been received. Read will not block on this.
-      * @param resolver
-      */
-     public void maybeStartBackgroundRepair(ResponseResolver resolver);
++    public void startRepair(DigestResolver digestResolver,
++                            List<InetAddressAndPort> allEndpoints,
++                            List<InetAddressAndPort> contactedEndpoints,
++                            Consumer<PartitionIterator> resultConsumer);
 +
 +    /**
-      * If {@link ReadRepair#maybeStartBackgroundRepair} was called with a {@link DigestResolver}, this will
-      * be called to perform a repair if there was a digest mismatch
++     * Wait for any operations started by {@link ReadRepair#startRepair} to complete
 +     */
-     public void backgroundDigestRepair(TraceState traceState);
++    public void awaitRepair() throws ReadTimeoutException;
 +
 +    static ReadRepair create(ReadCommand command, List<InetAddressAndPort> endpoints, long queryStartNanoTime, ConsistencyLevel consistency)
 +    {
 +        return new BlockingReadRepair(command, endpoints, queryStartNanoTime, consistency);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 487ed65,ac8b4f7..c29760e
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@@ -74,15 -67,10 +74,14 @@@ public final class TraceKeyspac
                  + "thread text,"
                  + "PRIMARY KEY ((session_id), event_id))");
  
 -    private static CFMetaData compile(String name, String description, String schema)
 +    private static TableMetadata parse(String table, String description, String cql)
      {
 -        return CFMetaData.compile(String.format(schema, name), SchemaConstants.TRACE_KEYSPACE_NAME)
 -                         .comment(description);
 +        return CreateTableStatement.parse(format(cql, table), SchemaConstants.TRACE_KEYSPACE_NAME)
 +                                   .id(TableId.forSystemTable(SchemaConstants.TRACE_KEYSPACE_NAME, table))
-                                    .dcLocalReadRepairChance(0.0)
 +                                   .gcGraceSeconds(0)
 +                                   .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1))
 +                                   .comment(description)
 +                                   .build();
      }
  
      public static KeyspaceMetadata metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java
index 406f27a,6fdedc2..71d632d
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java
@@@ -107,8 -107,8 +107,6 @@@ public class OverflowTest extends CQLTe
      {
          createTable("CREATE TABLE %s ( k int PRIMARY KEY, c int ) WITH "
                      + "comment = 'My comment' "
--                    + "AND read_repair_chance = 0.5 "
--                    + "AND dclocal_read_repair_chance = 0.5 "
                      + "AND gc_grace_seconds = 4 "
                      + "AND bloom_filter_fp_chance = 0.01 "
                      + "AND compaction = { 'class' : 'LeveledCompactionStrategy', 'sstable_size_in_mb' : 10, 'fanout_size' : 5 } "
@@@ -117,8 -117,8 +115,6 @@@
  
          execute("ALTER TABLE %s WITH "
                  + "comment = 'other comment' "
--                + "AND read_repair_chance = 0.3 "
--                + "AND dclocal_read_repair_chance = 0.3 "
                  + "AND gc_grace_seconds = 100 "
                  + "AND bloom_filter_fp_chance = 0.1 "
                  + "AND compaction = { 'class': 'SizeTieredCompactionStrategy', 'min_sstable_size' : 42 } "

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
index cfc9686,0000000..1e465b3
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
@@@ -1,447 -1,0 +1,443 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.io.FileReader;
 +import java.nio.ByteBuffer;
 +import java.nio.charset.Charset;
 +import java.util.*;
 +
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.io.Files;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.*;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.statements.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.*;
 +import org.apache.cassandra.index.sasi.*;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
 +import org.apache.cassandra.utils.*;
 +import org.json.simple.JSONArray;
 +import org.json.simple.JSONObject;
 +import org.json.simple.parser.JSONParser;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertTrue;
 +
 +public class TableCQLHelperTest extends CQLTester
 +{
 +    @Before
 +    public void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +    }
 +
 +    @Test
 +    public void testUserTypesCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_user_types";
 +        String table = "test_table_user_types";
 +
 +        UserType typeA = new UserType(keyspace, ByteBufferUtil.bytes("a"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("a1"),
 +                                                    FieldIdentifier.forUnquoted("a2"),
 +                                                    FieldIdentifier.forUnquoted("a3")),
 +                                      Arrays.asList(IntegerType.instance,
 +                                                    IntegerType.instance,
 +                                                    IntegerType.instance),
 +                                      true);
 +
 +        UserType typeB = new UserType(keyspace, ByteBufferUtil.bytes("b"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("b1"),
 +                                                    FieldIdentifier.forUnquoted("b2"),
 +                                                    FieldIdentifier.forUnquoted("b3")),
 +                                      Arrays.asList(typeA,
 +                                                    typeA,
 +                                                    typeA),
 +                                      true);
 +
 +        UserType typeC = new UserType(keyspace, ByteBufferUtil.bytes("c"),
 +                                      Arrays.asList(FieldIdentifier.forUnquoted("c1"),
 +                                                    FieldIdentifier.forUnquoted("c2"),
 +                                                    FieldIdentifier.forUnquoted("c3")),
 +                                      Arrays.asList(typeB,
 +                                                    typeB,
 +                                                    typeB),
 +                                      true);
 +
 +        TableMetadata cfm =
 +            TableMetadata.builder(keyspace, table)
 +                         .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                         .addClusteringColumn("ck1", IntegerType.instance)
 +                         .addRegularColumn("reg1", typeC)
 +                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
 +                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true))
 +                         .build();
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), Tables.of(cfm), Types.of(typeA, typeB, typeC));
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertEquals(ImmutableList.of("CREATE TYPE cql_test_keyspace_user_types.a (a1 varint, a2 varint, a3 varint);",
 +                                      "CREATE TYPE cql_test_keyspace_user_types.b (b1 a, b2 a, b3 a);",
 +                                      "CREATE TYPE cql_test_keyspace_user_types.c (c1 b, c2 b, c3 b);"),
 +                     TableCQLHelper.getUserTypesAsCQL(cfs.metadata()));
 +    }
 +
 +    @Test
 +    public void testDroppedColumnsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_dropped_columns";
 +        String table = "test_table_dropped_columns";
 +
 +        TableMetadata.Builder builder =
 +            TableMetadata.builder(keyspace, table)
 +                         .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                         .addClusteringColumn("ck1", IntegerType.instance)
 +                         .addRegularColumn("reg1", IntegerType.instance)
 +                         .addRegularColumn("reg2", IntegerType.instance)
 +                         .addRegularColumn("reg3", IntegerType.instance);
 +
 +        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
 +        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
 +        ColumnMetadata reg3 = builder.getColumn(ByteBufferUtil.bytes("reg3"));
 +
 +        builder.removeRegularOrStaticColumn(reg1.name)
 +               .removeRegularOrStaticColumn(reg2.name)
 +               .removeRegularOrStaticColumn(reg3.name);
 +
 +        builder.recordColumnDrop(reg1, 10000)
 +               .recordColumnDrop(reg2, 20000)
 +               .recordColumnDrop(reg3, 30000);
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg1 USING TIMESTAMP 10000;",
 +                                      "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg3 USING TIMESTAMP 30000;",
 +                                      "ALTER TABLE cql_test_keyspace_dropped_columns.test_table_dropped_columns DROP reg2 USING TIMESTAMP 20000;"),
 +                     TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
 +
 +        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
 +        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_dropped_columns.test_table_dropped_columns (\n" +
 +        "\tpk1 varint,\n" +
 +        "\tck1 varint,\n" +
 +        "\treg1 varint,\n" +
 +        "\treg3 varint,\n" +
 +        "\treg2 varint,\n" +
 +        "\tPRIMARY KEY (pk1, ck1))"));
 +    }
 +
 +    @Test
 +    public void testReaddedColumns()
 +    {
 +        String keyspace = "cql_test_keyspace_readded_columns";
 +        String table = "test_table_readded_columns";
 +
 +        TableMetadata.Builder builder =
 +            TableMetadata.builder(keyspace, table)
 +                         .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                         .addClusteringColumn("ck1", IntegerType.instance)
 +                         .addRegularColumn("reg1", IntegerType.instance)
 +                         .addStaticColumn("reg2", IntegerType.instance)
 +                         .addRegularColumn("reg3", IntegerType.instance);
 +
 +        ColumnMetadata reg1 = builder.getColumn(ByteBufferUtil.bytes("reg1"));
 +        ColumnMetadata reg2 = builder.getColumn(ByteBufferUtil.bytes("reg2"));
 +
 +        builder.removeRegularOrStaticColumn(reg1.name);
 +        builder.removeRegularOrStaticColumn(reg2.name);
 +
 +        builder.recordColumnDrop(reg1, 10000);
 +        builder.recordColumnDrop(reg2, 20000);
 +
 +        builder.addColumn(reg1);
 +        builder.addColumn(reg2);
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        // when re-adding, column is present in CREATE, then in DROP and then in ADD again, to record DROP with a proper timestamp
 +        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
 +        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_readded_columns.test_table_readded_columns (\n" +
 +        "\tpk1 varint,\n" +
 +        "\tck1 varint,\n" +
 +        "\treg2 varint static,\n" +
 +        "\treg1 varint,\n" +
 +        "\treg3 varint,\n" +
 +        "\tPRIMARY KEY (pk1, ck1))"));
 +
 +        assertEquals(ImmutableList.of("ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg1 USING TIMESTAMP 10000;",
 +                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg1 varint;",
 +                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns DROP reg2 USING TIMESTAMP 20000;",
 +                                      "ALTER TABLE cql_test_keyspace_readded_columns.test_table_readded_columns ADD reg2 varint static;"),
 +                     TableCQLHelper.getDroppedColumnsAsCQL(cfs.metadata()));
 +    }
 +
 +    @Test
 +    public void testCfmColumnsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_create_table";
 +        String table = "test_table_create_table";
 +
 +        TableMetadata.Builder metadata =
 +            TableMetadata.builder(keyspace, table)
 +                         .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                         .addPartitionKeyColumn("pk2", AsciiType.instance)
 +                         .addClusteringColumn("ck1", ReversedType.getInstance(IntegerType.instance))
 +                         .addClusteringColumn("ck2", IntegerType.instance)
 +                         .addStaticColumn("st1", AsciiType.instance)
 +                         .addRegularColumn("reg1", AsciiType.instance)
 +                         .addRegularColumn("reg2", ListType.getInstance(IntegerType.instance, false))
 +                         .addRegularColumn("reg3", MapType.getInstance(AsciiType.instance, IntegerType.instance, true));
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), metadata);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).startsWith(
 +        "CREATE TABLE IF NOT EXISTS cql_test_keyspace_create_table.test_table_create_table (\n" +
 +        "\tpk1 varint,\n" +
 +        "\tpk2 ascii,\n" +
 +        "\tck1 varint,\n" +
 +        "\tck2 varint,\n" +
 +        "\tst1 ascii static,\n" +
 +        "\treg1 ascii,\n" +
 +        "\treg2 frozen<list<varint>>,\n" +
 +        "\treg3 map<ascii, varint>,\n" +
 +        "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
 +        "\tWITH ID = " + cfs.metadata.id + "\n" +
 +        "\tAND CLUSTERING ORDER BY (ck1 DESC, ck2 ASC)"));
 +    }
 +
 +    @Test
 +    public void testCfmOptionsCQL()
 +    {
 +        String keyspace = "cql_test_keyspace_options";
 +        String table = "test_table_options";
 +
 +        TableMetadata.Builder builder = TableMetadata.builder(keyspace, table);
 +        builder.addPartitionKeyColumn("pk1", IntegerType.instance)
 +               .addClusteringColumn("cl1", IntegerType.instance)
 +               .addRegularColumn("reg1", AsciiType.instance)
 +               .bloomFilterFpChance(1.0)
 +               .comment("comment")
 +               .compaction(CompactionParams.lcs(Collections.singletonMap("sstable_size_in_mb", "1")))
 +               .compression(CompressionParams.lz4(1 << 16, 1 << 15))
-                .dcLocalReadRepairChance(0.2)
 +               .crcCheckChance(0.3)
 +               .defaultTimeToLive(4)
 +               .gcGraceSeconds(5)
 +               .minIndexInterval(6)
 +               .maxIndexInterval(7)
 +               .memtableFlushPeriod(8)
-                .readRepairChance(0.9)
 +               .speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE)
 +               .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
 +               .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
 +                                 FBUtilities.timestampMicros());
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertTrue(TableCQLHelper.getTableMetadataAsCQL(cfs.metadata(), true).endsWith(
 +        "AND bloom_filter_fp_chance = 1.0\n" +
-         "\tAND dclocal_read_repair_chance = 0.2\n" +
 +        "\tAND crc_check_chance = 0.3\n" +
 +        "\tAND default_time_to_live = 4\n" +
 +        "\tAND gc_grace_seconds = 5\n" +
 +        "\tAND min_index_interval = 6\n" +
 +        "\tAND max_index_interval = 7\n" +
 +        "\tAND memtable_flush_period_in_ms = 8\n" +
-         "\tAND read_repair_chance = 0.9\n" +
 +        "\tAND speculative_retry = 'ALWAYS'\n" +
 +        "\tAND comment = 'comment'\n" +
 +        "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
 +        "\tAND compaction = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'sstable_size_in_mb': '1' }\n" +
 +        "\tAND compression = { 'chunk_length_in_kb': '64', 'min_compress_ratio': '2.0', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor' }\n" +
 +        "\tAND cdc = false\n" +
 +        "\tAND extensions = { 'ext1': 0x76616c31 };"
 +        ));
 +    }
 +
 +    @Test
 +    public void testCfmIndexJson()
 +    {
 +        String keyspace = "cql_test_keyspace_3";
 +        String table = "test_table_3";
 +
 +        TableMetadata.Builder builder =
 +            TableMetadata.builder(keyspace, table)
 +                         .addPartitionKeyColumn("pk1", IntegerType.instance)
 +                         .addClusteringColumn("cl1", IntegerType.instance)
 +                         .addRegularColumn("reg1", AsciiType.instance);
 +
 +        ColumnIdentifier reg1 = ColumnIdentifier.getInterned("reg1", true);
 +
 +        builder.indexes(
 +            Indexes.of(IndexMetadata.fromIndexTargets(
 +            Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.VALUES)),
 +                                                      "indexName",
 +                                                      IndexMetadata.Kind.COMPOSITES,
 +                                                      Collections.emptyMap()),
 +                       IndexMetadata.fromIndexTargets(
 +                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS)),
 +                                                      "indexName2",
 +                                                      IndexMetadata.Kind.COMPOSITES,
 +                                                      Collections.emptyMap()),
 +                       IndexMetadata.fromIndexTargets(
 +                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
 +                                                      "indexName3",
 +                                                      IndexMetadata.Kind.COMPOSITES,
 +                                                      Collections.emptyMap()),
 +                       IndexMetadata.fromIndexTargets(
 +                       Collections.singletonList(new IndexTarget(reg1, IndexTarget.Type.KEYS_AND_VALUES)),
 +                                                      "indexName4",
 +                                                      IndexMetadata.Kind.CUSTOM,
 +                                                      Collections.singletonMap(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName()))));
 +
 +
 +        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), builder);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +
 +        assertEquals(ImmutableList.of("CREATE INDEX \"indexName\" ON cql_test_keyspace_3.test_table_3 (values(reg1));",
 +                                      "CREATE INDEX \"indexName2\" ON cql_test_keyspace_3.test_table_3 (keys(reg1));",
 +                                      "CREATE INDEX \"indexName3\" ON cql_test_keyspace_3.test_table_3 (entries(reg1));",
 +                                      "CREATE CUSTOM INDEX \"indexName4\" ON cql_test_keyspace_3.test_table_3 (entries(reg1)) USING 'org.apache.cassandra.index.sasi.SASIIndex';"),
 +                     TableCQLHelper.getIndexesAsCQL(cfs.metadata()));
 +    }
 +
 +    private final static String SNAPSHOT = "testsnapshot";
 +
 +    @Test
 +    public void testSnapshot() throws Throwable
 +    {
 +        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
 +        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
 +        String typeC = createType("CREATE TYPE %s (c1 frozen<" + typeB + ">, c2 frozen<" + typeB + ">, c3 frozen<" + typeB + ">);");
 +
 +        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
 +                                       "pk1 varint," +
 +                                       "pk2 ascii," +
 +                                       "ck1 varint," +
 +                                       "ck2 varint," +
 +                                       "reg1 " + typeC + "," +
 +                                       "reg2 int," +
 +                                       "reg3 int," +
 +                                       "PRIMARY KEY ((pk1, pk2), ck1, ck2)) WITH " +
 +                                       "CLUSTERING ORDER BY (ck1 ASC, ck2 DESC);");
 +
 +        alterTable("ALTER TABLE %s DROP reg3 USING TIMESTAMP 10000;");
 +        alterTable("ALTER TABLE %s ADD reg3 int;");
 +
 +        for (int i = 0; i < 10; i++)
 +            execute("INSERT INTO %s (pk1, pk2, ck1, ck2, reg1, reg2) VALUES (?, ?, ?, ?, ?, ?)", i, i + 1, i + 2, i + 3, null, i + 5);
 +
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 +        cfs.snapshot(SNAPSHOT);
 +
 +        String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
 +        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
 +        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (a1 varint, a2 varint, a3 varint);", keyspace(), typeA)));
 +        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (b1 frozen<%s>, b2 frozen<%s>, b3 frozen<%s>);", keyspace(), typeB, typeA, typeA, typeA)));
 +        assertTrue(schema.contains(String.format("CREATE TYPE %s.%s (c1 frozen<%s>, c2 frozen<%s>, c3 frozen<%s>);", keyspace(), typeC, typeB, typeB, typeB)));
 +
 +        schema = schema.substring(schema.indexOf("CREATE TABLE")); // trim to ensure order
 +
 +        assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
 +                                     "\tpk1 varint,\n" +
 +                                     "\tpk2 ascii,\n" +
 +                                     "\tck1 varint,\n" +
 +                                     "\tck2 varint,\n" +
 +                                     "\treg2 int,\n" +
 +                                     "\treg3 int,\n" +
 +                                     "\treg1 " + typeC + ",\n" +
 +                                     "\tPRIMARY KEY ((pk1, pk2), ck1, ck2))\n" +
 +                                     "\tWITH ID = " + cfs.metadata.id + "\n" +
 +                                     "\tAND CLUSTERING ORDER BY (ck1 ASC, ck2 DESC)"));
 +
 +        schema = schema.substring(schema.indexOf("ALTER"));
 +        assertTrue(schema.startsWith(String.format("ALTER TABLE %s.%s DROP reg3 USING TIMESTAMP 10000;", keyspace(), tableName)));
 +        assertTrue(schema.contains(String.format("ALTER TABLE %s.%s ADD reg3 int;", keyspace(), tableName)));
 +
 +        JSONObject manifest = (JSONObject) new JSONParser().parse(new FileReader(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT)));
 +        JSONArray files = (JSONArray) manifest.get("files");
 +        Assert.assertEquals(1, files.size());
 +    }
 +
 +    @Test
 +    public void testSystemKsSnapshot() throws Throwable
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers");
 +        cfs.snapshot(SNAPSHOT);
 +
 +        Assert.assertTrue(cfs.getDirectories().getSnapshotManifestFile(SNAPSHOT).exists());
 +        Assert.assertFalse(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT).exists());
 +    }
 +
 +    @Test
 +    public void testDroppedType() throws Throwable
 +    {
 +        String typeA = createType("CREATE TYPE %s (a1 varint, a2 varint, a3 varint);");
 +        String typeB = createType("CREATE TYPE %s (b1 frozen<" + typeA + ">, b2 frozen<" + typeA + ">, b3 frozen<" + typeA + ">);");
 +
 +        String tableName = createTable("CREATE TABLE IF NOT EXISTS %s (" +
 +                                       "pk1 varint," +
 +                                       "ck1 varint," +
 +                                       "reg1 " + typeB + "," +
 +                                       "reg2 varint," +
 +                                       "PRIMARY KEY (pk1, ck1));");
 +
 +        alterTable("ALTER TABLE %s DROP reg1 USING TIMESTAMP 10000;");
 +
 +        Runnable validate = () -> {
 +            try
 +            {
 +                ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(tableName);
 +                cfs.snapshot(SNAPSHOT);
 +                String schema = Files.toString(cfs.getDirectories().getSnapshotSchemaFile(SNAPSHOT), Charset.defaultCharset());
 +
 +                // When both column and it's type are dropped, the type in column definition gets substituted with a tuple
 +                assertTrue(schema.startsWith("CREATE TABLE IF NOT EXISTS " + keyspace() + "." + tableName + " (\n" +
 +                                             "\tpk1 varint,\n" +
 +                                             "\tck1 varint,\n" +
 +                                             "\treg2 varint,\n" +
 +                                             "\treg1 frozen<tuple<frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>, frozen<tuple<varint, varint, varint>>>>,\n" +
 +                                             "\tPRIMARY KEY (pk1, ck1))"));
 +                assertTrue(schema.contains("ALTER TABLE " + keyspace() + "." + tableName + " DROP reg1 USING TIMESTAMP 10000;"));
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +
 +        // Validate before and after the type drop
 +        validate.run();
 +        schemaChange("DROP TYPE " + keyspace() + "." + typeB);
 +        schemaChange("DROP TYPE " + keyspace() + "." + typeA);
 +        validate.run();
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message