[ https://issues.apache.org/jira/browse/DRILL-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439872#comment-16439872
]
ASF GitHub Bot commented on DRILL-6295:
---------------------------------------
Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1208#discussion_r181851002
--- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
---
@@ -262,68 +280,124 @@ public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged)
{
}
@Override
- public void execute(Partitioner part) throws IOException {
+ public void execute(Partitioner part) throws IOException, InterruptedException {
part.flushOutgoingBatches(isLastBatch, schemaChanged);
}
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
- private volatile IOException exp;
+ private volatile ExecutionException exception;
- public CustomRunnable(final String parentThreadName, final CountDownLatch latch,
final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface
iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch)
{
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
+ final Thread thread = Thread.currentThread();
+ Preconditions.checkState(runner.compareAndSet(null, thread),
+ "PartitionerTask can be executed only once.");
+ if (state.get() == STATE.NEW) {
+ final String name = thread.getName();
+ thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(),
thread.getId()));
+ final OperatorStats localStats = partitioner.getStats();
+ localStats.clear();
+ localStats.startProcessing();
+ ExecutionException executionException = null;
+ try {
+ // Test only - Pause until interrupted by fragment thread
+ testCountDownLatch.await();
+ iface.execute(partitioner);
+ } catch (InterruptedException e) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+ logger.warn("Partitioner Task interrupted during the run", e);
+ }
+ } catch (Throwable t) {
+ executionException = new ExecutionException(t);
+ }
+ if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+ if (executionException == null) {
+ localStats.stopProcessing();
+ state.lazySet(STATE.NORMAL);
+ } else {
+ exception = executionException;
+ state.lazySet(STATE.EXCEPTIONAL);
+ }
+ }
+ if (count.decrementAndGet() == 0) {
+ LockSupport.unpark(partitionerDecorator.thread);
+ }
+ thread.setName(name);
+ }
+ runner.set(null);
+ while (state.get() == STATE.INTERRUPTING) {
+ Thread.yield();
+ }
+ // Clear interrupt flag
try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test countdown
latch await()", e);
+ Thread.sleep(0);
--- End diff --
Could we use Thread.interrupted() instead? Javadoc suggests it's a good alternative to
use for clearing the interrupt flag. Also it avoids an unnecessary yield on some JVM implementations.
```
/**
* Tests whether the current thread has been interrupted. The
* <i>interrupted status</i> of the thread is cleared by this method.
In
* other words, if this method were to be called twice in succession, the
* second call would return false (unless the current thread were
* interrupted again, after the first call had cleared its interrupted
* status and before the second call had examined it).
*
* <p>A thread interruption ignored because a thread was not alive
* at the time of the interrupt will be reflected by this method
* returning false.
*
* @return <code>true</code> if the current thread has been interrupted;
* <code>false</code> otherwise.
* @see #isInterrupted()
* @revised 6.0
*/
```
> PartitionerDecorator may close partitioners while CustomRunnable are active during query
cancellation
> -----------------------------------------------------------------------------------------------------
>
> Key: DRILL-6295
> URL: https://issues.apache.org/jira/browse/DRILL-6295
> Project: Apache Drill
> Issue Type: Bug
> Reporter: Vlad Rozov
> Assignee: Vlad Rozov
> Priority: Critical
> Fix For: 1.14.0
>
>
> During query cancellation, in case {{PartitionerDecorator.executeMethodLogic()}} is active
(waiting on the {{latch}}), the wait will be interrupted and {{Futures}} cancelled, but there
is no guarantee that all {{CustomRunnable}} terminate before returning from {{PartitionerDecorator.executeMethodLogic()}}.
On exit, both income and outgoing batches are cleared, leading to clearing of underlying {{Vectors}}
and {{DrillBufs}}. This eventually causes unallocated memory access and JVM crash as {{CustomRunnable}}
may execute after income/outgoing batches are cleared.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
|