cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject [10/10] cassandra git commit: Merge branch 'cassandra-3.9' into trunk
Date Fri, 29 Jul 2016 07:34:18 GMT
Merge branch 'cassandra-3.9' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdaa53de
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdaa53de
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdaa53de

Branch: refs/heads/trunk
Commit: bdaa53de4ed938e451cba5cf2df7fa66ef00c153
Parents: cf68431 8e775ea
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Fri Jul 29 15:30:57 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Committed: Fri Jul 29 15:31:21 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 bin/cqlsh.py                                    |  2 +-
 .../cassandra/concurrent/StageManager.java      | 41 ++++----------
 .../cassandra/tracing/ExpiredTraceState.java    | 10 ++++
 .../apache/cassandra/tracing/TraceState.java    |  8 +++
 .../cassandra/tracing/TraceStateImpl.java       | 59 +++++++++++++++++++-
 .../apache/cassandra/tracing/TracingImpl.java   | 49 ++++++++++++----
 .../apache/cassandra/tracing/TracingTest.java   |  3 +
 8 files changed, 128 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/bin/cqlsh.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java
index badc527,64abf00..84b8da6
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@@ -59,7 -59,7 +59,7 @@@ public class StageManage
          stages.put(Stage.TRACING, tracingExecutor());
      }
  
--    private static ExecuteOnlyExecutor tracingExecutor()
++    private static LocalAwareExecutorService tracingExecutor()
      {
          RejectedExecutionHandler reh = new RejectedExecutionHandler()
          {
@@@ -68,13 -68,13 +68,13 @@@
                  MessagingService.instance().incrementDroppedMessages(MessagingService.Verb._TRACE);
              }
          };
--        return new ExecuteOnlyExecutor(1,
--                                       1,
--                                       KEEPALIVE,
--                                       TimeUnit.SECONDS,
--                                       new ArrayBlockingQueue<Runnable>(1000),
--                                       new NamedThreadFactory(Stage.TRACING.getJmxName()),
--                                       reh);
++        return new TracingExecutor(1,
++                                   1,
++                                   KEEPALIVE,
++                                   TimeUnit.SECONDS,
++                                   new ArrayBlockingQueue<Runnable>(1000),
++                                   new NamedThreadFactory(Stage.TRACING.getJmxName()),
++                                   reh);
      }
  
      private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
@@@ -112,13 -112,17 +112,12 @@@
          }
      }
  
 -    public final static Runnable NO_OP_TASK = () -> {};
 -
      /**
--     * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions
on the
-      * tracing stage.  See CASSANDRA-1123 for background.
 -     * tracing stage.  See CASSANDRA-1123 for background. We allow submitting NO_OP tasks,
to allow
 -     * a final wait on pending trace events since typically the tracing executor is single-threaded,
see
 -     * CASSANDRA-11465.
++     * The executor used for tracing.
       */
--    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
++    private static class TracingExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
      {
--        public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler
handler)
++        public TracingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler
handler)
          {
              super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
          }
@@@ -133,23 -137,28 +132,5 @@@
          {
              execute(command);
          }
--
--        @Override
--        public Future<?> submit(Runnable task)
--        {
 -            if (task.equals(NO_OP_TASK))
 -            {
 -                assert getMaximumPoolSize() == 1 : "Cannot wait for pending tasks if running
more than 1 thread";
 -                return super.submit(task);
 -            }
--            throw new UnsupportedOperationException();
--        }
--
--        @Override
--        public <T> Future<T> submit(Runnable task, T result)
--        {
--            throw new UnsupportedOperationException();
--        }
--
--        @Override
--        public <T> Future<T> submit(Callable<T> task)
--        {
--            throw new UnsupportedOperationException();
--        }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index fbe2c33,bc8d5dd..9230d38
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@@ -42,4 -42,9 +42,14 @@@ class ExpiredTraceState extends TraceSt
      {
          delegate.traceImpl(message);
      }
+ 
+     protected void waitForPendingEvents()
+     {
+         delegate.waitForPendingEvents();
+     }
++
++    TraceState getDelegate()
++    {
++        return delegate;
++    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 5365d09,ec2bc9e..b4eff6b
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -179,6 -181,8 +181,11 @@@ public abstract class TraceState implem
  
      protected abstract void traceImpl(String message);
  
 -    protected abstract void waitForPendingEvents();
++    protected void waitForPendingEvents()
++    {
++        // if tracing events are asynchronous, then you can use this method to wait for
them to complete
++    }
+ 
      public boolean acquireReference()
      {
          while (true)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index 113ebb7,e2d3a68..55e8389
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@@ -19,7 -19,12 +19,16 @@@ package org.apache.cassandra.tracing
  
  import java.net.InetAddress;
  import java.util.Collections;
++import java.util.Set;
  import java.util.UUID;
 -import java.util.concurrent.ExecutionException;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
@@@ -35,6 -41,10 +45,12 @@@ import org.apache.cassandra.utils.Wrapp
   */
  public class TraceStateImpl extends TraceState
  {
+     private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class);
+     private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
+       Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs",
"1"));
+ 
++    private final Set<Future<?>> pendingFutures = ConcurrentHashMap.newKeySet();
++
      public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
      {
          super(coordinator, sessionId, traceType);
@@@ -46,17 -56,45 +62,54 @@@
          final int elapsed = elapsed();
  
          executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed,
threadName, ttl));
+         if (logger.isTraceEnabled())
+             logger.trace("Adding <{}> to trace events", message);
      }
  
-     static void executeMutation(final Mutation mutation)
+     /**
 -     * Post a no-op event to the TRACING stage, so that we can be sure that any previous
mutations
 -     * have at least been applied to one replica. This works because the tracking executor
only
 -     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
++     * Wait on submitted futures
+      */
+     protected void waitForPendingEvents()
      {
-         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
+         if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
+             return;
+ 
+         try
+         {
+             if (logger.isTraceEnabled())
 -                logger.trace("Waiting for up to {} seconds for trace events to complete",
 -                             WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
++                logger.trace("Waiting for up to {} seconds for {} trace events to complete",
++                             +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, pendingFutures.size());
+ 
 -            StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
 -                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
++            CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture<?>[pendingFutures.size()]))
++                             .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
++        }
++        catch (TimeoutException ex)
++        {
++            if (logger.isTraceEnabled())
++                logger.trace("Failed to wait for tracing events to complete in {} seconds",
++                             WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
+         }
+         catch (Throwable t)
+         {
+             JVMStabilityInspector.inspectThrowable(t);
 -            logger.debug("Failed to wait for tracing events to complete: {}", t);
++            logger.error("Got exception whilst waiting for tracing events to complete",
t);
+         }
+     }
+ 
 -    static void executeMutation(final Mutation mutation)
++
++    void executeMutation(final Mutation mutation)
+     {
 -        StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
++        CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable()
          {
              protected void runMayThrow()
              {
                  mutateWithCatch(mutation);
              }
--        });
++        }, StageManager.getStage(Stage.TRACING));
++
++        boolean ret = pendingFutures.add(fut);
++        if (!ret)
++            logger.warn("Failed to insert pending future, tracing synchronization may not
work");
      }
  
      static void mutateWithCatch(Mutation mutation)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TracingImpl.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TracingImpl.java
index 52ac183,52ac183..4f69584
--- a/src/java/org/apache/cassandra/tracing/TracingImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java
@@@ -28,10 -28,10 +28,6 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.utils.WrappedRunnable;
  
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--
  
  /**
   * A trace session context. Able to track and store trace sessions. A session is usually
a user initiated query, and may
@@@ -39,31 -39,31 +35,62 @@@
   */
  class TracingImpl extends Tracing
  {
--    private static final Logger logger = LoggerFactory.getLogger(TracingImpl.class);
--
      public void stopSessionImpl() {
--        TraceState state = get();
++        final TraceStateImpl state = getStateImpl();
++        if (state == null)
++            return;
++
          int elapsed = state.elapsed();
          ByteBuffer sessionId = state.sessionIdBytes;
          int ttl = state.ttl;
--        TraceStateImpl.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId,
elapsed, ttl));
++
++        state.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed,
ttl));
      }
  
      public TraceState begin(final String request, final InetAddress client, final Map<String,
String> parameters)
      {
          assert isTracing();
  
--        final TraceState state = get();
++        final TraceStateImpl state = getStateImpl();
++        assert state != null;
++
          final long startedAt = System.currentTimeMillis();
          final ByteBuffer sessionId = state.sessionIdBytes;
          final String command = state.traceType.toString();
          final int ttl = state.ttl;
  
--        TraceStateImpl.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId,
client, parameters, request, startedAt, command, ttl));
--
++        state.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client,
parameters, request, startedAt, command, ttl));
          return state;
      }
  
++    /**
++     * Convert the abstract tracing state to its implementation.
++     *
++     * Expired states are not put in the sessions but the check is for extra safety.
++     *
++     * @return the state converted to its implementation, or null
++     */
++    private TraceStateImpl getStateImpl()
++    {
++        TraceState state = get();
++        if (state == null)
++            return null;
++
++        if (state instanceof ExpiredTraceState)
++        {
++            ExpiredTraceState expiredTraceState = (ExpiredTraceState) state;
++            state = expiredTraceState.getDelegate();
++        }
++
++        if (state instanceof TraceStateImpl)
++        {
++            return (TraceStateImpl)state;
++        }
++
++        assert false : "TracingImpl states should be of type TraceStateImpl";
++        return null;
++    }
++
      @Override
      protected TraceState newTraceState(InetAddress coordinator, UUID sessionId, TraceType
traceType)
      {


Mime
View raw message