cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [5/9] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Date Mon, 17 Feb 2014 17:13:02 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/service/AntiEntropyService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b50b2b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b50b2b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b50b2b2

Branch: refs/heads/trunk
Commit: 4b50b2b2e41aeda86c166059826e8eb1498b24fc
Parents: 44cf4a6 6dfca3d
Author: Yuki Morishita <yukim@apache.org>
Authored: Mon Feb 17 11:12:26 2014 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Mon Feb 17 11:12:26 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                         | 1 +
 src/java/org/apache/cassandra/repair/RepairJob.java | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b50b2b2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c9fabd2,f146166..bdfec11
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,33 -9,24 +25,34 @@@ Merged from 1.2
   * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700)
   * IN on the last clustering columns + ORDER BY DESC yield no results (CASSANDRA-6701)
   * Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
+  * Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
  
 -
 -1.2.15
 - * Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
 - * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
 -
 -
 -1.2.14
 - * Reverted code to limit CQL prepared statement cache by size (CASSANDRA-6592)
 - * add cassandra.default_messaging_version property to allow easier
 -   upgrading from 1.1 (CASSANDRA-6619)
 - * Allow executing CREATE statements multiple times (CASSANDRA-6471)
 - * Don't send confusing info with timeouts (CASSANDRA-6491)
 - * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
 - * Don't drop local mutations without a hint (CASSANDRA-6510)
 - * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
 - * Validate SliceRange start and finish lengths (CASSANDRA-6521)
 +2.0.5
 + * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
 + * Add ks.cf names to tombstone logging (CASSANDRA-6597)
 + * Use LOCAL_QUORUM for LWT operations at LOCAL_SERIAL (CASSANDRA-6495)
 + * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
 + * Delete unfinished compaction incrementally (CASSANDRA-6086)
 + * Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
 + * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
 + * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
 + * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
 + * Add support for 2.1 global counter shards (CASSANDRA-6505)
 + * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
 + * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
 + * Fix paging discardFirst (CASSANDRA-6555)
 + * Fix ArrayIndexOutOfBoundsException in 2ndary index query (CASSANDRA-6470)
 + * Release sstables upon rebuilding 2i (CASSANDRA-6635)
 + * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
 + * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
 + * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
 + * Switch stress to use ITransportFactory (CASSANDRA-6641)
 + * Fix IllegalArgumentException during prepare (CASSANDRA-6592)
 + * Fix possible loss of 2ndary index entries during compaction (CASSANDRA-6517)
 + * Fix direct Memory on architectures that do not support unaligned long access
 +   (CASSANDRA-6628)
 + * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
 +Merged from 1.2:
   * fsync compression metadata (CASSANDRA-6531)
   * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
   * Add ability to throttle batchlog replay (CASSANDRA-6550)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b50b2b2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index 16daf4e,0000000..6705c95
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -1,224 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.locks.Condition;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.db.SnapshotCommand;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.net.IAsyncCallback;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.repair.messages.ValidationRequest;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.SimpleCondition;
 +
 +/**
 + * RepairJob runs repair on given ColumnFamily.
 + */
 +public class RepairJob
 +{
 +    private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
 +
 +    public final RepairJobDesc desc;
 +    private final boolean isSequential;
 +    // first we send tree requests. this tracks the endpoints remaining to hear from
 +    private final RequestCoordinator<InetAddress> treeRequests;
 +    // tree responses are then tracked here
 +    private final List<TreeResponse> trees = new ArrayList<>();
 +    // once all responses are received, each tree is compared with each other, and differencer
tasks
 +    // are submitted. the job is done when all differencers are complete.
 +    private final RequestCoordinator<Differencer> differencers;
 +    private final Condition requestsSent = new SimpleCondition();
 +    private CountDownLatch snapshotLatch = null;
 +    private int gcBefore = -1;
 +
 +    private volatile boolean failed = false;
 +
 +    /**
 +     * Create repair job to run on specific columnfamily
 +     */
 +    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token>
range, boolean isSequential)
 +    {
 +        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
 +        this.isSequential = isSequential;
 +        this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
 +        {
 +            public void send(InetAddress endpoint)
 +            {
 +                ValidationRequest request = new ValidationRequest(desc, gcBefore);
 +                MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
 +            }
 +        };
 +        this.differencers = new RequestCoordinator<Differencer>(isSequential)
 +        {
 +            public void send(Differencer d)
 +            {
 +                StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * @return true if this job failed
 +     */
 +    public boolean isFailed()
 +    {
 +        return failed;
 +    }
 +
 +    /**
 +     * Send merkle tree request to every involved neighbor.
 +     */
 +    public void sendTreeRequests(Collection<InetAddress> endpoints)
 +    {
 +        // send requests to all nodes
 +        List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
 +        allEndpoints.add(FBUtilities.getBroadcastAddress());
 +
 +        if (isSequential)
-             makeSnapshots(endpoints);
++            makeSnapshots(allEndpoints);
 +
 +        this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
 +
 +        for (InetAddress endpoint : allEndpoints)
 +            treeRequests.add(endpoint);
 +
 +        logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)",
desc.sessionId, desc.columnFamily, allEndpoints));
 +        treeRequests.start();
 +        requestsSent.signalAll();
 +    }
 +
 +    public void makeSnapshots(Collection<InetAddress> endpoints)
 +    {
 +        try
 +        {
 +            snapshotLatch = new CountDownLatch(endpoints.size());
 +            IAsyncCallback callback = new IAsyncCallback()
 +            {
 +                public boolean isLatencyForSnitch()
 +                {
 +                    return false;
 +                }
 +
 +                public void response(MessageIn msg)
 +                {
 +                    RepairJob.this.snapshotLatch.countDown();
 +                }
 +            };
 +            for (InetAddress endpoint : endpoints)
 +                MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace, desc.columnFamily,
desc.sessionId.toString(), false).createMessage(), endpoint, callback);
 +            snapshotLatch.await();
 +            snapshotLatch = null;
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Add a new received tree and return the number of remaining tree to
 +     * be received for the job to be complete.
 +     *
 +     * Callers may assume exactly one addTree call will result in zero remaining endpoints.
 +     *
 +     * @param endpoint address of the endpoint that sent response
 +     * @param tree sent Merkle tree or null if validation failed on endpoint
 +     * @return the number of responses waiting to receive
 +     */
 +    public synchronized int addTree(InetAddress endpoint, MerkleTree tree)
 +    {
 +        // Wait for all request to have been performed (see #3400)
 +        try
 +        {
 +            requestsSent.await();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError("Interrupted while waiting for requests to be sent");
 +        }
 +
 +        if (tree == null)
 +            failed = true;
 +        else
 +            trees.add(new TreeResponse(endpoint, tree));
 +        return treeRequests.completed(endpoint);
 +    }
 +
 +    /**
 +     * Submit differencers for running.
 +     * All tree *must* have been received before this is called.
 +     */
 +    public void submitDifferencers()
 +    {
 +        assert !failed;
 +
 +        // We need to difference all trees one against another
 +        for (int i = 0; i < trees.size() - 1; ++i)
 +        {
 +            TreeResponse r1 = trees.get(i);
 +            for (int j = i + 1; j < trees.size(); ++j)
 +            {
 +                TreeResponse r2 = trees.get(j);
 +                Differencer differencer = new Differencer(desc, r1, r2);
 +                logger.debug("Queueing comparison {}", differencer);
 +                differencers.add(differencer);
 +            }
 +        }
 +        differencers.start();
 +        trees.clear(); // allows gc to do its thing
 +    }
 +
 +    /**
 +     * @return true if the given node pair was the last remaining
 +     */
 +    synchronized boolean completedSynchronization(NodePair nodes, boolean success)
 +    {
 +        if (!success)
 +            failed = true;
 +        Differencer completed = new Differencer(desc, new TreeResponse(nodes.endpoint1,
null), new TreeResponse(nodes.endpoint2, null));
 +        return differencers.completed(completed) == 0;
 +    }
 +
 +    /**
 +     * terminate this job.
 +     */
 +    public void terminate()
 +    {
 +        if (snapshotLatch != null)
 +        {
 +            while (snapshotLatch.getCount() > 0)
 +                snapshotLatch.countDown();
 +        }
 +    }
 +}


Mime
View raw message