drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6295) PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation
Date Mon, 16 Apr 2018 23:37:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440163#comment-16440163
] 

ASF GitHub Bot commented on DRILL-6295:
---------------------------------------

Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1208#discussion_r181915654
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
    @@ -262,68 +280,122 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged)
{
         }
     
         @Override
    -    public void execute(Partitioner part) throws IOException {
    +    public void execute(Partitioner part) throws IOException, InterruptedException {
           part.flushOutgoingBatches(isLastBatch, schemaChanged);
         }
       }
     
       /**
    -   * Helper class to wrap Runnable with customized naming
    -   * Exception handling
    +   * Helper class to wrap Runnable with cancellation and waiting for completion support
        *
        */
    -  private static class CustomRunnable implements Runnable {
    +  private static class PartitionerTask implements Runnable {
    +
    +    private enum STATE {
    +      NEW,
    +      COMPLETING,
    +      NORMAL,
    +      EXCEPTIONAL,
    +      CANCELLED,
    +      INTERRUPTING,
    +      INTERRUPTED
    +    }
    +
    +    private final AtomicReference<STATE> state;
    +    private final AtomicReference<Thread> runner;
    +    private final PartitionerDecorator partitionerDecorator;
    +    private final AtomicInteger count;
     
    -    private final String parentThreadName;
    -    private final CountDownLatch latch;
         private final GeneralExecuteIface iface;
    -    private final Partitioner part;
    +    private final Partitioner partitioner;
         private CountDownLatchInjection testCountDownLatch;
     
    -    private volatile IOException exp;
    +    private volatile ExecutionException exception;
     
    -    public CustomRunnable(final String parentThreadName, final CountDownLatch latch,
final GeneralExecuteIface iface,
    -        final Partitioner part, CountDownLatchInjection testCountDownLatch) {
    -      this.parentThreadName = parentThreadName;
    -      this.latch = latch;
    +    public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface
iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch)
{
    +      state = new AtomicReference<>(STATE.NEW);
    +      runner = new AtomicReference<>();
    +      this.partitionerDecorator = partitionerDecorator;
           this.iface = iface;
    -      this.part = part;
    +      this.partitioner = partitioner;
    +      this.count = count;
           this.testCountDownLatch = testCountDownLatch;
         }
     
         @Override
         public void run() {
    -      // Test only - Pause until interrupted by fragment thread
    -      try {
    -        testCountDownLatch.await();
    -      } catch (final InterruptedException e) {
    -        logger.debug("Test only: partitioner thread is interrupted in test countdown
latch await()", e);
    -      }
    -
    -      final Thread currThread = Thread.currentThread();
    -      final String currThreadName = currThread.getName();
    -      final OperatorStats localStats = part.getStats();
    -      try {
    -        final String newThreadName = parentThreadName + currThread.getId();
    -        currThread.setName(newThreadName);
    +      final Thread thread = Thread.currentThread();
    +      if (runner.compareAndSet(null, thread)) {
    +        final String name = thread.getName();
    +        thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(),
thread.getId()));
    +        final OperatorStats localStats = partitioner.getStats();
             localStats.clear();
             localStats.startProcessing();
    -        iface.execute(part);
    -      } catch (IOException e) {
    -        exp = e;
    -      } finally {
    -        localStats.stopProcessing();
    -        currThread.setName(currThreadName);
    -        latch.countDown();
    +        ExecutionException executionException = null;
    +        try {
    +          // Test only - Pause until interrupted by fragment thread
    +          testCountDownLatch.await();
    +          if (state.get() == STATE.NEW) {
    +            iface.execute(partitioner);
    +          }
    +        } catch (InterruptedException e) {
    +          if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
    +            logger.warn("Partitioner Task interrupted during the run", e);
    +          }
    +        } catch (Throwable t) {
    +          executionException = new ExecutionException(t);
    +        } finally {
    +          if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
    +            if (executionException == null) {
    +              localStats.stopProcessing();
    +              state.lazySet(STATE.NORMAL);
    +            } else {
    +              exception = executionException;
    +              state.lazySet(STATE.EXCEPTIONAL);
    +            }
    +          }
    +          if (count.decrementAndGet() == 0) {
    +            LockSupport.unpark(partitionerDecorator.thread);
    +          }
    +          thread.setName(name);
    +          while (state.get() == STATE.INTERRUPTING) {
    +            Thread.yield();
    +          }
    +          // Clear interrupt flag
    +          Thread.interrupted();
    +        }
    +      }
    +    }
    +
    +    void cancel(boolean mayInterruptIfRunning) {
    +      Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread,
    +          String.format("PartitionerTask can be cancelled only from the main %s thread",
partitionerDecorator.thread.getName()));
    +      if (runner.compareAndSet(null, partitionerDecorator.thread)) {
    +        if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
    +          ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
    +        }
    +        count.decrementAndGet();
    --- End diff --
    
    The main PartitionDecorator thread may not get unparked.
    
    1. Two PartitionerTasks are created.
    2. Before any of the PartitionerTasks set their runner, the main Partitioner thread enters
await() and cancels each PartitionerTask.
    3. The cancel method executes ```
            if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
              ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
            }
            count.decrementAndGet();```
        For each PartitionerTask.
     4. The count goes to zero, but the main PartitionerDecorator thread still calls Locksupport.park()
in await().
     5. Unpark wasn't called when the count reached zero so the main PartitionDecorator thread
can remain parked.


> PartitionerDecorator may close partitioners while CustomRunnable are active during query
cancellation
> -----------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6295
>                 URL: https://issues.apache.org/jira/browse/DRILL-6295
>             Project: Apache Drill
>          Issue Type: Bug
>            Reporter: Vlad Rozov
>            Assignee: Vlad Rozov
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> During query cancellation, in case {{PartitionerDecorator.executeMethodLogic()}} is active
(waiting on the {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there
is no guarantee that all {{CustomRunnable}} terminate before returning from {{PartitionerDecorator.executeMethodLogic()}}.
On exit, both income and outgoing batches are cleared, leading to clearing of underlying {{Vectors}}
and {{DrillBufs}}. This eventually causes unallocated memory access and JVM crash as {{CustomRunnable}}
may execute after income/outgoing batches are cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message