cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Date Fri, 08 May 2015 22:54:11 GMT
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a583f70e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a583f70e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a583f70e

Branch: refs/heads/trunk
Commit: a583f70eeeada31478a55b5774ef222c55956220
Parents: 40a7e86 93478ab
Author: Yuki Morishita <yukim@apache.org>
Authored: Fri May 8 17:53:37 2015 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Fri May 8 17:53:37 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++++
 .../cassandra/repair/AnticompactionTask.java    | 98 ++++++++++++++++++++
 .../repair/RepairMessageVerbHandler.java        | 31 ++++---
 .../apache/cassandra/repair/RepairRunnable.java | 34 +++----
 .../cassandra/service/ActiveRepairService.java  | 27 +++---
 .../apache/cassandra/utils/SemanticVersion.java |  2 +-
 7 files changed, 177 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index c9d77f4,5beb709..6b4bb73
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -673,6 -546,37 +673,37 @@@ public final class SystemKeyspac
      }
  
      /**
+      * Get release version for given endpoint.
+      * If release version is unknown, then this returns null.
+      *
+      * @param ep endpoint address to check
+      * @return Release version or null if version is unknown.
+      */
+     public static SemanticVersion getReleaseVersion(InetAddress ep)
+     {
+         try
+         {
+             if (FBUtilities.getBroadcastAddress().equals(ep))
+             {
+                 return new SemanticVersion(FBUtilities.getReleaseVersionString());
+             }
+             String req = "SELECT release_version FROM system.%s WHERE peer=?";
 -            UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep);
++            UntypedResultSet result = executeInternal(String.format(req, PEERS), ep);
+             if (result != null && result.one().has("release_version"))
+             {
+                 return new SemanticVersion(result.one().getString("release_version"));
+             }
+             // version is unknown
+             return null;
+         }
+         catch (IllegalArgumentException e)
+         {
+             // version string cannot be parsed
+             return null;
+         }
+     }
+ 
+     /**
       * One of three things will happen if you try to read the system keyspace:
       * 1. files are present and you can read them: great
       * 2. no files are there: great (new node is assumed)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 0000000,e505d91..d1bbb82
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -1,0 -1,93 +1,98 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.repair;
+ 
+ import java.net.InetAddress;
++import java.util.Collection;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.AbstractFuture;
+ 
+ import org.apache.cassandra.db.SystemKeyspace;
++import org.apache.cassandra.dht.Range;
++import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.repair.messages.AnticompactionRequest;
+ import org.apache.cassandra.utils.SemanticVersion;
+ 
+ public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+ {
+     /*
+      * Version that anticompaction response is not supported up to.
+      * If Cassandra version is more than this, we need to wait for anticompaction response.
+      */
+     private static final SemanticVersion VERSION_CHECKER = new SemanticVersion("2.1.5");
+ 
+     private final UUID parentSession;
+     private final InetAddress neighbor;
++    private final Collection<Range<Token>> successfulRanges;
+ 
 -    public AnticompactionTask(UUID parentSession, InetAddress neighbor)
++    public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>>
successfulRanges)
+     {
+         this.parentSession = parentSession;
+         this.neighbor = neighbor;
++        this.successfulRanges = successfulRanges;
+     }
+ 
+     public void run()
+     {
 -        AnticompactionRequest acr = new AnticompactionRequest(parentSession);
++        AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
+         SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+         if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+         {
+             MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this),
TimeUnit.DAYS.toMillis(1), true);
+         }
+         else
+         {
+             MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+             // immediately return after sending request
+             set(neighbor);
+         }
+     }
+ 
+     /**
+      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
+      */
+     public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+     {
+         final AnticompactionTask task;
+ 
+         public AnticompactionCallback(AnticompactionTask task)
+         {
+             this.task = task;
+         }
+ 
+         public void response(MessageIn msg)
+         {
+             task.set(msg.from);
+         }
+ 
+         public boolean isLatencyForSnitch()
+         {
+             return false;
+         }
+ 
+         public void onFailure(InetAddress from)
+         {
+             task.setException(new RuntimeException("Anticompaction failed or timed out in
" + from));
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 4984d3a,60b2243..e7613c5
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -17,20 -17,15 +17,12 @@@
   */
  package org.apache.cassandra.repair;
  
--import java.util.ArrayList;
--import java.util.Collections;
--import java.util.List;
- import java.util.Set;
--import java.util.UUID;
--import java.util.concurrent.Future;
++import java.util.*;
  
  import com.google.common.base.Predicate;
 +import com.google.common.collect.Sets;
- 
- import org.apache.cassandra.dht.Bounds;
- import org.apache.cassandra.dht.Range;
- import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.MoreExecutors;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -38,14 -33,18 +30,17 @@@ import org.apache.cassandra.config.Sche
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.LocalPartitioner;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.net.IVerbHandler;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.repair.messages.*;
  import org.apache.cassandra.service.ActiveRepairService;
--import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
  /**
@@@ -128,9 -112,17 +123,17 @@@ public class RepairMessageVerbHandler i
                      break;
  
                  case ANTICOMPACTION_REQUEST:
 -                    logger.debug("Got anticompaction request");
                      AnticompactionRequest anticompactionRequest = (AnticompactionRequest)
message.payload;
 -                    ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
 +                    logger.debug("Got anticompaction request {}", anticompactionRequest);
-                     ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession,
anticompactionRequest.successfulRanges);
++                    ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession,
anticompactionRequest.successfulRanges);
+                     compactionDone.addListener(new Runnable()
+                     {
+                         @Override
+                         public void run()
+                         {
+                             MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE),
id, message.from);
+                         }
+                     }, MoreExecutors.sameThreadExecutor());
                      break;
  
                  default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index 2900794,0000000..28511db
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,399 -1,0 +1,399 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.*;
 +import org.apache.commons.lang3.time.DurationFormatUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.TraceKeyspace;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.progress.ProgressEvent;
 +import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +
 +public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class);
 +
 +    private StorageService storageService;
 +    private final int cmd;
 +    private final RepairOption options;
 +    private final String keyspace;
 +
 +    private final List<ProgressListener> listeners = new ArrayList<>();
 +
 +    public RepairRunnable(StorageService storageService, int cmd, RepairOption options,
String keyspace)
 +    {
 +        this.storageService = storageService;
 +        this.cmd = cmd;
 +        this.options = options;
 +        this.keyspace = keyspace;
 +    }
 +
 +    @Override
 +    public void addProgressListener(ProgressListener listener)
 +    {
 +        listeners.add(listener);
 +    }
 +
 +    @Override
 +    public void removeProgressListener(ProgressListener listener)
 +    {
 +        listeners.remove(listener);
 +    }
 +
 +    protected void fireProgressEvent(String tag, ProgressEvent event)
 +    {
 +        for (ProgressListener listener : listeners)
 +        {
 +            listener.progress(tag, event);
 +        }
 +    }
 +
 +    protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress,
String message)
 +    {
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount,
totalProgress, message));
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount,
totalProgress));
 +    }
 +
 +    protected void runMayThrow() throws Exception
 +    {
 +        final TraceState traceState;
 +
 +        final String tag = "repair:" + cmd;
 +
 +        final AtomicInteger progress = new AtomicInteger();
 +        final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors,
validation, prepare for repair + number of ranges to repair
 +
 +        String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
 +        Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false,
false, keyspace,
 +                                                                                       
        columnFamilies);
 +
 +        final long startTime = System.currentTimeMillis();
 +        String message = String.format("Starting repair command #%d, repairing keyspace
%s with %s", cmd, keyspace,
 +                                       options);
 +        logger.info(message);
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message));
 +        if (options.isTraced())
 +        {
 +            StringBuilder cfsb = new StringBuilder();
 +            for (ColumnFamilyStore cfs : validColumnFamilies)
 +                cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
 +
 +            UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
 +            traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace,
"columnFamilies",
 +                                                                          cfsb.substring(2)));
 +            Tracing.traceRepair(message);
 +            traceState.enableActivityNotification(tag);
 +            for (ProgressListener listener : listeners)
 +                traceState.addProgressListener(listener);
 +            Thread queryThread = createQueryThread(cmd, sessionId);
 +            queryThread.setName("RepairTracePolling");
 +            queryThread.start();
 +        }
 +        else
 +        {
 +            traceState = null;
 +        }
 +
 +        final Set<InetAddress> allNeighbors = new HashSet<>();
 +        Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
 +        try
 +        {
 +            for (Range<Token> range : options.getRanges())
 +            {
 +                    Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace,
range,
 +                                                                                  options.getDataCenters(),
 +                                                                                  options.getHosts());
 +                    rangeToNeighbors.put(range, neighbors);
 +                    allNeighbors.addAll(neighbors);
 +            }
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            logger.error("Repair failed:", e);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
 +            return;
 +        }
 +
 +        // Validate columnfamilies
 +        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
 +        try
 +        {
 +            Iterables.addAll(columnFamilyStores, validColumnFamilies);
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage());
 +            return;
 +        }
 +
 +        String[] cfnames = new String[columnFamilyStores.size()];
 +        for (int i = 0; i < columnFamilyStores.size(); i++)
 +        {
 +            cfnames[i] = columnFamilyStores.get(i).name;
 +        }
 +
 +        final UUID parentSession = UUIDGen.getTimeUUID();
 +        SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges());
 +        long repairedAt;
 +        try
 +        {
 +            ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options,
columnFamilyStores);
 +            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt;
 +            progress.incrementAndGet();
 +        }
 +        catch (Throwable t)
 +        {
 +            SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage());
 +            return;
 +        }
 +
 +        // Set up RepairJob executor for this repair command.
 +        final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
 +                                                                                       
                                 Integer.MAX_VALUE,
 +                                                                                       
                                 TimeUnit.SECONDS,
 +                                                                                       
                                 new LinkedBlockingQueue<Runnable>(),
 +                                                                                       
                                 new NamedThreadFactory("Repair#" + cmd),
 +                                                                                       
                                 "internal"));
 +
 +        List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 +        for (Range<Token> range : options.getRanges())
 +        {
 +            final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                              range,
 +                                                              keyspace,
 +                                                              options.getParallelism(),
 +                                                              rangeToNeighbors.get(range),
 +                                                              repairedAt,
 +                                                              executor,
 +                                                              cfnames);
 +            if (session == null)
 +                continue;
 +            // After repair session completes, notify client its result
 +            Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
 +            {
 +                public void onSuccess(RepairSessionResult result)
 +                {
 +                    String message = String.format("Repair session %s for range %s finished",
session.getId(),
 +                                                   session.getRange().toString());
 +                    logger.info(message);
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +
 +                public void onFailure(Throwable t)
 +                {
 +                    String message = String.format("Repair session %s for range %s failed
with error %s",
 +                                                   session.getId(), session.getRange().toString(),
t.getMessage());
 +                    logger.error(message, t);
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +            });
 +            futures.add(session);
 +        }
 +
 +        // After all repair sessions completes(successful or not),
 +        // run anticompaction if necessary and send finish notice back to client
++        final Collection<Range<Token>> successfulRanges = new ArrayList<>();
++        final AtomicBoolean hasFailure = new AtomicBoolean();
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures);
-         Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
++        ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>,
Object>()
 +        {
-             public void onSuccess(List<RepairSessionResult> result)
++            @SuppressWarnings("unchecked")
++            public ListenableFuture apply(List<RepairSessionResult> results) throws
Exception
 +            {
-                 boolean hasFailure = false;
 +                // filter out null(=failed) results and get successful ranges
-                 Collection<Range<Token>> successfulRanges = new ArrayList<>();
-                 for (RepairSessionResult sessionResult : result)
++                for (RepairSessionResult sessionResult : results)
 +                {
 +                    if (sessionResult != null)
 +                    {
 +                        successfulRanges.add(sessionResult.range);
 +                    }
 +                    else
 +                    {
-                         hasFailure = true;
++                        hasFailure.compareAndSet(false, true);
 +                    }
 +                }
-                 try
-                 {
-                     ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors,
successfulRanges);
-                     SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
-                 }
-                 catch (Exception e)
-                 {
-                     logger.error("Error in incremental repair", e);
-                     SystemDistributedKeyspace.failParentRepair(parentSession, e);
-                 }
-                 if (hasFailure)
++                return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors,
successfulRanges);
++            }
++        });
++        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
++        {
++            public void onSuccess(Object result)
++            {
++                SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges);
++                if (hasFailure.get())
 +                {
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(),
totalProgress,
 +                                                             "Some repair failed"));
 +                }
 +                else
 +                {
 +                    fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS,
progress.get(), totalProgress,
 +                                                             "Repair completed successfully"));
 +                }
 +                repairComplete();
 +            }
 +
 +            public void onFailure(Throwable t)
 +            {
 +                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(),
totalProgress, t.getMessage()));
 +                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +                repairComplete();
 +            }
 +
 +            private void repairComplete()
 +            {
 +                String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis()
- startTime,
 +                                                                          true, true);
 +                String message = String.format("Repair command #%d finished in %s", cmd,
duration);
 +                fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(),
totalProgress, message));
 +                logger.info(message);
 +                if (options.isTraced() && traceState != null)
 +                {
 +                    for (ProgressListener listener : listeners)
 +                        traceState.removeProgressListener(listener);
 +                    // Because DebuggableThreadPoolExecutor#afterExecute and this callback
 +                    // run in a nondeterministic order (within the same thread), the
 +                    // TraceState may have been nulled out at this point. The TraceState
 +                    // should be traceState, so just set it without bothering to check if
it
 +                    // actually was nulled out.
 +                    Tracing.instance.set(traceState);
 +                    Tracing.traceRepair(message);
 +                    Tracing.instance.stopSession();
 +                }
 +                executor.shutdownNow();
 +            }
 +        });
 +    }
 +
 +    private Thread createQueryThread(final int cmd, final UUID sessionId)
 +    {
 +        return new Thread(new WrappedRunnable()
 +        {
 +            // Query events within a time interval that overlaps the last by one second.
Ignore duplicates. Ignore local traces.
 +            // Wake up upon local trace activity. Query when notified of trace activity
with a timeout that doubles every two timeouts.
 +            public void runMayThrow() throws Exception
 +            {
 +                TraceState state = Tracing.instance.get(sessionId);
 +                if (state == null)
 +                    throw new Exception("no tracestate");
 +
 +                String format = "select event_id, source, activity from %s.%s where session_id
= ? and event_id > ? and event_id < ?;";
 +                String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS);
 +                SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
 +
 +                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
 +                InetAddress source = FBUtilities.getBroadcastAddress();
 +
 +                HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new
HashSet<>() };
 +                int si = 0;
 +                UUID uuid;
 +
 +                long tlast = System.currentTimeMillis(), tcur;
 +
 +                TraceState.Status status;
 +                long minWaitMillis = 125;
 +                long maxWaitMillis = 1000 * 1024L;
 +                long timeout = minWaitMillis;
 +                boolean shouldDouble = false;
 +
 +                while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
 +                {
 +                    if (status == TraceState.Status.IDLE)
 +                    {
 +                        timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) :
timeout;
 +                        shouldDouble = !shouldDouble;
 +                    }
 +                    else
 +                    {
 +                        timeout = minWaitMillis;
 +                        shouldDouble = false;
 +                    }
 +                    ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast
- 1000));
 +                    ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur
= System.currentTimeMillis()));
 +                    QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
Lists.newArrayList(sessionIdBytes,
 +                                                                                       
                          tminBytes,
 +                                                                                       
                          tmaxBytes));
 +                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(),
options);
 +                    UntypedResultSet result = UntypedResultSet.create(rows.result);
 +
 +                    for (UntypedResultSet.Row r : result)
 +                    {
 +                        if (source.equals(r.getInetAddress("source")))
 +                            continue;
 +                        if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000)
* 10000)
 +                            seen[si].add(uuid);
 +                        if (seen[si == 0 ? 1 : 0].contains(uuid))
 +                            continue;
 +                        String message = String.format("%s: %s", r.getInetAddress("source"),
r.getString("activity"));
 +                        fireProgressEvent("repair:" + cmd,
 +                                          new ProgressEvent(ProgressEventType.NOTIFICATION,
0, 0, message));
 +                    }
 +                    tlast = tcur;
 +
 +                    si = si == 0 ? 1 : 0;
 +                    seen[si].clear();
 +                }
 +            }
 +        });
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index a6ebe89,8d3563c..d350f4e
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -28,12 -27,9 +28,9 @@@ import java.util.concurrent.atomic.Atom
  import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.Sets;
- import com.google.common.util.concurrent.ListeningExecutorService;
- import com.google.common.util.concurrent.MoreExecutors;
--
  import com.google.common.util.concurrent.Futures;
  import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.ListeningExecutorService;
  import com.google.common.util.concurrent.MoreExecutors;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -52,14 -49,15 +49,14 @@@ import org.apache.cassandra.net.IAsyncC
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.repair.*;
 -import org.apache.cassandra.repair.messages.PrepareMessage;
 -import org.apache.cassandra.repair.messages.RepairMessage;
 -import org.apache.cassandra.repair.messages.SyncComplete;
 -import org.apache.cassandra.repair.messages.ValidationComplete;
++import org.apache.cassandra.repair.AnticompactionTask;
 +import org.apache.cassandra.repair.RepairJobDesc;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.RepairSession;
 +import org.apache.cassandra.repair.messages.*;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.UUIDGen;
  import org.apache.cassandra.utils.concurrent.Ref;
--
  import org.apache.cassandra.utils.concurrent.Refs;
  
  /**
@@@ -312,19 -325,29 +309,19 @@@ public class ActiveRepairServic
       *
       * @param parentSession Parent session ID
       * @param neighbors Repair participants (not including self)
 -     * @param doAntiCompaction true if repair session needs anti compaction
 -     * @throws InterruptedException
 -     * @throws ExecutionException
 +     * @param successfulRanges Ranges that repaired successfully
-      * @throws InterruptedException
-      * @throws ExecutionException
       */
-     public synchronized void finishParentSession(UUID parentSession, Set<InetAddress>
neighbors, Collection<Range<Token>> successfulRanges) throws InterruptedException,
ExecutionException
 -    public synchronized ListenableFuture<?> finishParentSession(UUID parentSession,
Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
++    public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress>
neighbors, Collection<Range<Token>> successfulRanges)
      {
-         for (InetAddress neighbor : neighbors)
 -        if (doAntiCompaction)
--        {
-             AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
-             MessageOut<RepairMessage> req = acr.createMessage();
-             MessagingService.instance().sendOneWay(req, neighbor);
-         }
-         doAntiCompaction(parentSession, successfulRanges).get();
+             List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size()
+ 1);
+             for (InetAddress neighbor : neighbors)
+             {
 -                AnticompactionTask task = new AnticompactionTask(parentSession, neighbor);
++                AnticompactionTask task = new AnticompactionTask(parentSession, neighbor,
successfulRanges);
+                 tasks.add(task);
+                 task.run(); // 'run' is just sending message
+             }
 -            tasks.add(doAntiCompaction(parentSession));
++            tasks.add(doAntiCompaction(parentSession, successfulRanges));
+             return Futures.successfulAsList(tasks);
 -        }
 -        else
 -        {
 -            removeParentRepairSession(parentSession);
 -            return Futures.immediateFuture(null);
 -        }
      }
  
      public ParentRepairSession getParentRepairSession(UUID parentSessionId)


Mime
View raw message