cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [09/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Thu, 26 May 2016 06:34:00 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/118bea59
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/118bea59
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/118bea59

Branch: refs/heads/cassandra-3.7
Commit: 118bea59e4a0bdc0b9cca9d717685f1157764c15
Parents: 2bacc9a 0318046
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Thu May 26 08:18:21 2016 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Thu May 26 08:18:21 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../repair/RepairMessageVerbHandler.java        |  9 +-
 .../apache/cassandra/repair/RepairRunnable.java |  2 +-
 .../cassandra/service/ActiveRepairService.java  | 92 ++++++++++++++++++--
 .../LeveledCompactionStrategyTest.java          |  2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  3 +-
 .../service/ActiveRepairServiceTest.java        |  2 +-
 7 files changed, 95 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6a952c4,f73db6e..190c2fa
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,5 +1,25 @@@
 -2.1.15
 +2.2.7
 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches and
 +   report errors correctly if workers processes crash on initialization (CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
   * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
   * Do not consider local node a valid source during replace (CASSANDRA-11848)
   * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 8a0706a,7debc93..6e7922f
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -66,26 -66,14 +66,27 @@@ public class RepairMessageVerbHandler i
                      List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
                      for (UUID cfId : prepareMessage.cfIds)
                      {
 -                        Pair<String, String> kscf = Schema.instance.getCF(cfId);
 -                        ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 +                        ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(cfId);
 +                        if (columnFamilyStore == null)
 +                        {
 +                            logErrorAndSendFailureResponse(String.format("Table with id %s was dropped during prepare phase of repair",
 +                                                                         cfId.toString()), message.from, id);
 +                            return;
 +                        }
                          columnFamilyStores.add(columnFamilyStore);
                      }
 +                    CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from);
 +                    // note that we default isGlobal to true since old version always default to true:
 +                    boolean isGlobal = peerVersion == null ||
 +                                       peerVersion.compareTo(ActiveRepairService.SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) < 0 ||
 +                                       message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE);
 +                    logger.debug("Received prepare message: global message = {}, peerVersion = {},", message.payload.messageType.equals(RepairMessage.Type.PREPARE_GLOBAL_MESSAGE), peerVersion);
                      ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
-                             columnFamilyStores,
-                             prepareMessage.ranges,
-                             prepareMessage.isIncremental,
-                             isGlobal);
+                                                                              message.from,
+                                                                              columnFamilyStores,
 -                                                                             prepareMessage.ranges);
++                                                                             prepareMessage.ranges,
++                                                                             prepareMessage.isIncremental,
++                                                                             isGlobal);
                      MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                      break;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index d2b6ab6,0000000..b849cf8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,409 -1,0 +1,409 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.*;
 +import org.apache.commons.lang3.time.DurationFormatUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.TraceKeyspace;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.progress.ProgressEvent;
 +import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +
 +public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
 +
 +    private StorageService storageService;
 +    private final int cmd;
 +    private final RepairOption options;
 +    private final String keyspace;
 +
 +    private final List<ProgressListener> listeners = new ArrayList<>();
 +
 +    public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
 +    {
 +        this.storageService = storageService;
 +        this.cmd = cmd;
 +        this.options = options;
 +        this.keyspace = keyspace;
 +    }
 +
 +    @Override
 +    public void addProgressListener(ProgressListener listener)
 +    {
 +        listeners.add(listener);
 +    }
 +
 +    @Override
 +    public void removeProgressListener(ProgressListener listener)
 +    {
 +        listeners.remove(listener);
 +    }
 +
 +    protected void fireProgressEvent(String tag, ProgressEvent event)
 +    {
 +        for (ProgressListener listener : listeners)
 +        {
 +            listener.progress(tag, event);
 +        }
 +    }
 +
 +    protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
 +    {
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
 +    }
 +
 +    protected void runMayThrow() throws Exception
 +    {
 +        final TraceState traceState;
 +
 +        final String tag = "repair:" + cmd;
 +
 +        final AtomicInteger progress = new AtomicInteger();
 +        final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair
 +
 +        String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
 +        Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace,
 +                                                                                                columnFamilies);
 +
 +        final long startTime = System.currentTimeMillis();
 +        String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace,
 +                                       options);
 +        logger.info(message);
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
 +        if (options.isTraced())
 +        {
 +            StringBuilder cfsb = new StringBuilder();
 +            for (ColumnFamilyStore cfs : validColumnFamilies)
 +                cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
 +
 +            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
 +            traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies",
 +                                                                          cfsb.substring(2)));
 +            Tracing.traceRepair(message);
 +            traceState.enableActivityNotification(tag);
 +            for (ProgressListener listener : listeners)
 +                traceState.addProgressListener(listener);
 +            Thread queryThread = createQueryThread(cmd, sessionId);
 +            queryThread.setName("RepairTracePolling");
 +            queryThread.start();
 +        }
 +        else
 +        {
 +            traceState = null;
 +        }
 +
 +        final Set<InetAddress> allNeighbors = new HashSet<>();
 +        Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
 +        try
 +        {
 +            for (Range<Token> range : options.getRanges())
 +            {
 +                    Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
 +                                                                                  options.getDataCenters(),
 +                                                                                  options.getHosts());
 +                    rangeToNeighbors.put(range, neighbors);
 +                    allNeighbors.addAll(neighbors);
 +            }
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            logger.error("Repair failed:", e);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
 +            return;
 +        }
 +
 +        // Validate columnfamilies
 +        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
 +        try
 +        {
 +            Iterables.addAll(columnFamilyStores, validColumnFamilies);
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
 +            return;
 +        }
 +
 +        String[] cfnames = new String[columnFamilyStores.size()];
 +        for (int i = 0; i < columnFamilyStores.size(); i++)
 +        {
 +            cfnames[i] = columnFamilyStores.get(i).name;
 +        }
 +
 +        final UUID parentSession = UUIDGen.getTimeUUID();
 +        SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges());
 +        long repairedAt;
 +        try
 +        {
-             ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores);
++            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
 +            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
 +            progress.incrementAndGet();
 +        }
 +        catch (Throwable t)
 +        {
 +            SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
 +            return;
 +        }
 +
 +        // Set up RepairJob executor for this repair command.
 +        final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
 +                                                                                                                         Integer.MAX_VALUE,
 +                                                                                                                         TimeUnit.SECONDS,
 +                                                                                                                         new LinkedBlockingQueue<Runnable>(),
 +                                                                                                                         new NamedThreadFactory("Repair#" + cmd),
 +                                                                                                                         "internal"));
 +
 +        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 +        for (Range<Token> range : options.getRanges())
 +        {
 +            final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                              range,
 +                                                              keyspace,
 +                                                              options.getParallelism(),
 +                                                              rangeToNeighbors.get(range),
 +                                                              repairedAt,
 +                                                              executor,
 +                                                              cfnames);
 +            if (session == null)
 +                continue;
 +            // After repair session completes, notify client its result
 +            Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
 +            {
 +                public void onSuccess(RepairSessionResult result)
 +                {
 +                    /**
 +                     * If the success message below is modified, it must also be updated on
 +                     * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 +                     * for backward-compatibility support.
 +                     */
 +                    String message = String.format("Repair session %s for range %s finished", session.getId(),
 +                                                   session.getRange().toString());
 +                    logger.info(message);
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +
 +                public void onFailure(Throwable t)
 +                {
 +                    /**
 +                     * If the failure message below is modified, it must also be updated on
 +                     * {@link org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 +                     * for backward-compatibility support.
 +                     */
 +                    String message = String.format("Repair session %s for range %s failed with error %s",
 +                                                   session.getId(), session.getRange().toString(), t.getMessage());
 +                    logger.error(message, t);
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +            });
 +            futures.add(session);
 +        }
 +
 +        // After all repair sessions completes(successful or not),
 +        // run anticompaction if necessary and send finish notice back to client
 +        final Collection<Range<Token>> successfulRanges = new ArrayList<>();
 +        final AtomicBoolean hasFailure = new AtomicBoolean();
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
 +        ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>()
 +        {
 +            @SuppressWarnings("unchecked")
 +            public ListenableFuture apply(List<RepairSessionResult> results) throws Exception
 +            {
 +                // filter out null(=failed) results and get successful ranges
 +                for (RepairSessionResult sessionResult : results)
 +                {
 +                    if (sessionResult != null)
 +                    {
 +                        successfulRanges.add(sessionResult.range);
 +                    }
 +                    else
 +                    {
 +                        hasFailure.compareAndSet(false, true);
 +                    }
 +                }
 +                return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges);
 +            }
 +        });
 +        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
 +        {
 +            public void onSuccess(Object result)
 +            {
 +                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
 +                if (hasFailure.get())
 +                {
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
 +                                                             "Some repair failed"));
 +                }
 +                else
 +                {
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
 +                                                             "Repair completed successfully"));
 +                }
 +                repairComplete();
 +            }
 +
 +            public void onFailure(Throwable t)
 +            {
 +                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage()));
 +                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +                repairComplete();
 +            }
 +
 +            private void repairComplete()
 +            {
 +                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
 +                                                                          true, true);
 +                String message = String.format("Repair command #%d finished in %s", cmd, duration);
 +                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message));
 +                logger.info(message);
 +                if (options.isTraced() && traceState != null)
 +                {
 +                    for (ProgressListener listener : listeners)
 +                        traceState.removeProgressListener(listener);
 +                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
 +                    // run in a nondeterministic order (within the same thread), the
 +                    // TraceState may have been nulled out at this point. The TraceState
 +                    // should be traceState, so just set it without bothering to check if it
 +                    // actually was nulled out.
 +                    Tracing.instance.set(traceState);
 +                    Tracing.traceRepair(message);
 +                    Tracing.instance.stopSession();
 +                }
 +                executor.shutdownNow();
 +            }
 +        });
 +    }
 +
 +    private Thread createQueryThread(final int cmd, final UUID sessionId)
 +    {
 +        return new Thread(new WrappedRunnable()
 +        {
 +            // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
 +            // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
 +            public void runMayThrow() throws Exception
 +            {
 +                TraceState state = Tracing.instance.get(sessionId);
 +                if (state == null)
 +                    throw new Exception("no tracestate");
 +
 +                String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
 +                String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
 +                SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
 +
 +                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
 +                InetAddress source = FBUtilities.getBroadcastAddress();
 +
 +                HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
 +                int si = 0;
 +                UUID uuid;
 +
 +                long tlast = System.currentTimeMillis(), tcur;
 +
 +                TraceState.Status status;
 +                long minWaitMillis = 125;
 +                long maxWaitMillis = 1000 * 1024L;
 +                long timeout = minWaitMillis;
 +                boolean shouldDouble = false;
 +
 +                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
 +                {
 +                    if (status == TraceState.Status.IDLE)
 +                    {
 +                        timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
 +                        shouldDouble = !shouldDouble;
 +                    }
 +                    else
 +                    {
 +                        timeout = minWaitMillis;
 +                        shouldDouble = false;
 +                    }
 +                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
 +                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
 +                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
 +                                                                                                                  tminBytes,
 +                                                                                                                  tmaxBytes));
 +                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
 +                    UntypedResultSet result = UntypedResultSet.create(rows.result);
 +
 +                    for (UntypedResultSet.Row r : result)
 +                    {
 +                        if (source.equals(r.getInetAddress("source")))
 +                            continue;
 +                        if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
 +                            seen[si].add(uuid);
 +                        if (seen[si == 0 ? 1 : 0].contains(uuid))
 +                            continue;
 +                        String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
 +                        fireProgressEvent("repair:" + cmd,
 +                                          new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
 +                    }
 +                    tlast = tcur;
 +
 +                    si = si == 0 ? 1 : 0;
 +                    seen[si].clear();
 +                }
 +            }
 +        });
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 1ea5aaf,f8975f9..5d010f9
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -35,16 -33,21 +35,22 @@@ import com.google.common.util.concurren
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 -import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
  import org.apache.cassandra.gms.FailureDetector;
  import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.IFailureDetector;
+ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.IFailureDetectionEventListener;
+ import org.apache.cassandra.gms.VersionedValue;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.net.IAsyncCallbackWithFailure;
  import org.apache.cassandra.net.MessageIn;
@@@ -75,26 -78,31 +81,27 @@@ import org.apache.cassandra.utils.concu
   * The creation of a repair session is done through the submitRepairSession that
   * returns a future on the completion of that session.
   */
- public class ActiveRepairService
+ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
  {
 -    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
 -    // singleton enforcement
 -    public static final ActiveRepairService instance = new ActiveRepairService();
 -
 -    public static final long UNREPAIRED_SSTABLE = 0;
 -
 -    private static final ThreadPoolExecutor executor;
 -    private boolean registeredForEndpointChanges = false;
 -
 -    static
 -    {
 -        executor = new JMXConfigurableThreadPoolExecutor(4,
 -                                                         60,
 -                                                         TimeUnit.SECONDS,
 -                                                         new LinkedBlockingQueue<Runnable>(),
 -                                                         new NamedThreadFactory("AntiEntropySessions"),
 -                                                         "internal");
 -    }
 -
 +    /**
 +     * @deprecated this statuses are from the previous JMX notification service,
 +     * which will be deprecated on 4.0. For statuses of the new notification
 +     * service, see {@link org.apache.cassandra.streaming.StreamEvent.ProgressEvent}
 +     */
 +    @Deprecated
      public static enum Status
      {
          STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
      }
++    private boolean registeredForEndpointChanges = false;
 +
 +    public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1");
 +
 +    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
 +    // singleton enforcement
 +    public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance);
 +
 +    public static final long UNREPAIRED_SSTABLE = 0;
  
      /**
       * A map of active coordinator session.
@@@ -246,9 -252,10 +253,9 @@@
          return neighbors;
      }
  
-     public synchronized UUID prepareForRepair(UUID parentRepairSession, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
 -    public synchronized UUID prepareForRepair(InetAddress coordinator, Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
++    public synchronized UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
      {
-         registerParentRepairSession(parentRepairSession, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal());
 -        UUID parentRepairSession = UUIDGen.getTimeUUID();
 -        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, ranges);
++        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), options.isGlobal());
          final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
          final AtomicBoolean status = new AtomicBoolean(true);
          final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@@ -313,9 -317,36 +320,15 @@@
          return parentRepairSession;
      }
  
-     public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
 -    public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
++    public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal)
      {
-         parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis()));
+         if (!registeredForEndpointChanges)
+         {
+             Gossiper.instance.register(this);
+             FailureDetector.instance.registerFailureDetectionEventListener(this);
+             registeredForEndpointChanges = true;
+         }
 -
 -        cleanupOldParentRepairSessions();
 -
 -        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, System.currentTimeMillis()));
 -    }
 -
 -    /**
 -     * Cleans up old failed parent repair sessions - if it is 24h old, we remove it from the map
 -     */
 -    private void cleanupOldParentRepairSessions()
 -    {
 -        long currentTime = System.currentTimeMillis();
 -
 -        Set<UUID> expired = new HashSet<>();
 -        for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
 -        {
 -            ParentRepairSession session = entry.getValue();
 -            if (session.failed && currentTime - session.repairedAt > TimeUnit.HOURS.toMillis(24))
 -                expired.add(entry.getKey());
 -        }
 -        for (UUID remove : expired)
 -            parentRepairSessions.remove(remove);
++        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, isGlobal, System.currentTimeMillis()));
      }
  
      public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
@@@ -353,7 -394,13 +366,13 @@@
  
      public ParentRepairSession getParentRepairSession(UUID parentSessionId)
      {
-         return parentRepairSessions.get(parentSessionId);
+         ParentRepairSession session = parentRepairSessions.get(parentSessionId);
+         // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing
+         // that it was down itself.
 -        if (session != null && session.failed)
++        if (session == null)
+             throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed.");
+ 
+         return session;
      }
  
      public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
@@@ -432,17 -465,26 +451,19 @@@
  
      public static class ParentRepairSession
      {
 -        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
 -        public final Collection<Range<Token>> ranges;
 +        private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
 +        private final Collection<Range<Token>> ranges;
          public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
-         private final long repairedAt;
 -        /**
 -         * used as fail time if failed is true
 -         */
 +        public final boolean isIncremental;
 +        public final boolean isGlobal;
+         public final long repairedAt;
+         public final InetAddress coordinator;
 -        /**
 -         * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a
 -         * request, we need to fail the coordinator as well.
 -         */
 -        public final boolean failed;
  
-         public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
 -        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed)
++        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
          {
+             this.coordinator = coordinator;
              for (ColumnFamilyStore cfs : columnFamilyStores)
              {
 -
                  this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
                  sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
              }
@@@ -492,12 -540,10 +513,13 @@@
              }
          }
  
 -        public ParentRepairSession asFailed()
 +        public long getRepairedAt()
          {
 -            return new ParentRepairSession(coordinator, Collections.<ColumnFamilyStore>emptyList(), Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true);
 +            if (isGlobal)
 +                return repairedAt;
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
          }
++
          @Override
          public String toString()
          {
@@@ -509,4 -555,61 +531,58 @@@
                      '}';
          }
      }
+ 
+     /*
+     If the coordinator node dies we should remove the parent repair session from the other nodes.
+     This uses the same notifications as we get in RepairSession
+      */
+     public void onJoin(InetAddress endpoint, EndpointState epState) {}
+     public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+     public void onAlive(InetAddress endpoint, EndpointState state) {}
+     public void onDead(InetAddress endpoint, EndpointState state) {}
+ 
+     public void onRemove(InetAddress endpoint)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     public void onRestart(InetAddress endpoint, EndpointState state)
+     {
+         convict(endpoint, Double.MAX_VALUE);
+     }
+ 
+     /**
+      * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed.
+      *
+      * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree
+      * that the repair failed, we need to fail the entire repair session
+      *
+      * @param ep  endpoint to be convicted
+      * @param phi the value of phi with with ep was convicted
+      */
+     public void convict(InetAddress ep, double phi)
+     {
+         // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty())
+             return;
+ 
+         Set<UUID> toRemove = new HashSet<>();
+ 
+         for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : parentRepairSessions.entrySet())
+         {
+             if (repairSessionEntry.getValue().coordinator.equals(ep))
+             {
+                 toRemove.add(repairSessionEntry.getKey());
+             }
+         }
+ 
+         if (!toRemove.isEmpty())
+         {
 -            logger.debug("Failing {} in parent repair sessions", toRemove);
++            logger.debug("Removing {} in parent repair sessions", toRemove);
+             for (UUID id : toRemove)
 -            {
 -                ParentRepairSession failed = parentRepairSessions.get(id);
 -                parentRepairSessions.replace(id, failed, failed.asFailed());
 -            }
++                parentRepairSessions.remove(id);
+         }
+     }
+ 
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 0ec854a,6ec4c7b..8b9ca08
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -196,10 -106,10 +196,10 @@@ public class LeveledCompactionStrategyT
          assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
  
          Range<Token> range = new Range<>(Util.token(""), Util.token(""));
 -        int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
 +        int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
          UUID parentRepSession = UUID.randomUUID();
-         ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false, true);
 -        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range));
 -        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range);
++        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, true);
 +        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
          Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
          CompactionManager.instance.submitValidation(cfs, validator).get();
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index e5c03b9,0000000..892ced1
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@@ -1,129 -1,0 +1,130 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.service.ActiveRepairService;
++import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.MerkleTree;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class LocalSyncTaskTest extends SchemaLoader
 +{
 +    private static final IPartitioner partirioner = Murmur3Partitioner.instance;
 +    public static final String KEYSPACE1 = "DifferencerTest";
 +    public static final String CF_STANDARD = "Standard1";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws Exception
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
 +    }
 +
 +    /**
 +     * When there is no difference between two, LocalSyncTask should return stats with 0 difference.
 +     */
 +    @Test
 +    public void testNoDifference() throws Throwable
 +    {
 +        final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
 +        final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
 +
 +        Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
 +        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range);
 +
 +        MerkleTree tree1 = createInitialTree(desc);
 +        MerkleTree tree2 = createInitialTree(desc);
 +
 +        // difference the trees
 +        // note: we reuse the same endpoint which is bogus in theory but fine here
 +        TreeResponse r1 = new TreeResponse(ep1, tree1);
 +        TreeResponse r2 = new TreeResponse(ep2, tree2);
 +        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
 +        task.run();
 +
 +        assertEquals(0, task.get().numberOfDifferences);
 +    }
 +
 +    @Test
 +    public void testDifference() throws Throwable
 +    {
 +        Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
 +        UUID parentRepairSession = UUID.randomUUID();
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 +
-         ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false, false);
++        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, false);
 +
 +        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);
 +
 +        MerkleTree tree1 = createInitialTree(desc);
 +        MerkleTree tree2 = createInitialTree(desc);
 +
 +        // change a range in one of the trees
 +        Token token = partirioner.midpoint(range.left, range.right);
 +        tree1.invalidate(token);
 +        MerkleTree.TreeRange changed = tree1.get(token);
 +        changed.hash("non-empty hash!".getBytes());
 +
 +        Set<Range<Token>> interesting = new HashSet<>();
 +        interesting.add(changed);
 +
 +        // difference the trees
 +        // note: we reuse the same endpoint which is bogus in theory but fine here
 +        TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
 +        TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
 +        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
 +        task.run();
 +
 +        // ensure that the changed range was recorded
 +        assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
 +    }
 +
 +    private MerkleTree createInitialTree(RepairJobDesc desc)
 +    {
 +        MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
 +        tree.init();
 +        for (MerkleTree.TreeRange r : tree.invalids())
 +        {
 +            r.ensureHashInitialised();
 +        }
 +        return tree;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/118bea59/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index b4066d7,26e5126..7793660
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -232,7 -56,7 +232,7 @@@ public class ActiveRepairServiceTes
          Set<SSTableReader> original = store.getUnrepairedSSTables();
  
          UUID prsId = UUID.randomUUID();
-         ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null, true, false);
 -        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
++        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false);
          ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
  
          //add all sstables to parent repair session


Mime
View raw message