drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6295) PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation
Date Mon, 16 Apr 2018 19:03:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439872#comment-16439872
] 

ASF GitHub Bot commented on DRILL-6295:
---------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181851002
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
    @@ -262,68 +280,124 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged)
{
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch,
final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface
iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch)
{
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    +      final Thread thread = Thread.currentThread();
    +      Preconditions.checkState(runner.compareAndSet(null, thread),
    +          "PartitionerTask can be executed only once.");
    +      if (state.get() == STATE.NEW) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(),
thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
    +        localStats.clear();
    +        localStats.startProcessing();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          iface.execute(partitioner);
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        }
    +        if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +          if (executionException == null) {
    +            localStats.stopProcessing();
    +            state.lazySet(STATE.NORMAL);
    +          } else {
    +            exception = executionException;
    +            state.lazySet(STATE.EXCEPTIONAL);
    +          }
    +        }
    +        if (count.decrementAndGet() == 0) {
    +          LockSupport.unpark(partitionerDecorator.thread);
    +        }
    +        thread.setName(name);
    +      }
    +      runner.set(null);
    +      while (state.get() == STATE.INTERRUPTING) {
    +        Thread.yield();
    +      }
    +      // Clear interrupt flag
           try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown
latch await()", e);
    +        Thread.sleep(0);
    --- End diff --
    
    Could we use Thread.interrupted() instead? Javadoc suggests it's a good alternative to
use for clearing the interrupt flag. Also it avoids an unnecessary yield on some JVM implementations.
    
    ```
        /**
         * Tests whether the current thread has been interrupted.  The
         * <i>interrupted status</i> of the thread is cleared by this method.
 In
         * other words, if this method were to be called twice in succession, the
         * second call would return false (unless the current thread were
         * interrupted again, after the first call had cleared its interrupted
         * status and before the second call had examined it).
         *
         * <p>A thread interruption ignored because a thread was not alive
         * at the time of the interrupt will be reflected by this method
         * returning false.
         *
         * @return  <code>true</code> if the current thread has been interrupted;
         *          <code>false</code> otherwise.
         * @see #isInterrupted()
         * @revised 6.0
         */
    ```


> PartitionerDecorator may close partitioners while CustomRunnable are active during query
cancellation
> -----------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6295
>                 URL: https://issues.apache.org/jira/browse/DRILL-6295
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Vlad Rozov
>            Assignee: Vlad Rozov
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> During query cancellation, in case {{PartitionerDecorator.executeMethodLogic()}} is active
(waiting on the {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there
is no guarantee that all {{CustomRunnable}} terminate before returning from {{PartitionerDecorator.executeMethodLogic()}}.
On exit, both income and outgoing batches are cleared, leading to clearing of underlying {{Vectors}}
and {{DrillBufs}}. This eventually causes unallocated memory access and JVM crash as {{CustomRunnable}}
may execute after income/outgoing batches are cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message