drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5963) Canceling a query hung in planning state, leaves the query in ENQUEUED state for ever.
Date Tue, 28 Nov 2017 06:55:04 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268242#comment-16268242
] 

ASF GitHub Bot commented on DRILL-5963:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1051#discussion_r153405107
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
---
    @@ -0,0 +1,439 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.work.foreman;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.ArrayListMultimap;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Sets;
    +import io.netty.buffer.ByteBuf;
    +import org.apache.drill.common.concurrent.ExtendedLatch;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.base.FragmentRoot;
    +import org.apache.drill.exec.proto.BitControl;
    +import org.apache.drill.exec.proto.BitControl.PlanFragment;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.GeneralRPCProtos;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.rpc.RpcException;
    +import org.apache.drill.exec.rpc.UserClientConnection;
    +import org.apache.drill.exec.rpc.control.Controller;
    +import org.apache.drill.exec.server.DrillbitContext;
    +import org.apache.drill.exec.testing.ControlsInjector;
    +import org.apache.drill.exec.testing.ControlsInjectorFactory;
    +import org.apache.drill.exec.work.EndpointListener;
    +import org.apache.drill.exec.work.WorkManager.WorkerBee;
    +import org.apache.drill.exec.work.fragment.FragmentExecutor;
    +import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
    +import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
    +import org.apache.drill.exec.work.fragment.RootFragmentManager;
    +
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.CountDownLatch;
    +
    +/**
    + * Is responsible for submitting query fragments for running (locally and remotely).
    + */
    +public class FragmentsRunner {
    +
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class);
    +  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class);
    +
    +  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
    +
    +  private final WorkerBee bee;
    +  private final UserClientConnection initiatingClient;
    +  private final DrillbitContext drillbitContext;
    +  private final Foreman foreman;
    +
    +  private List<PlanFragment> planFragments;
    +  private PlanFragment rootPlanFragment;
    +  private FragmentRoot rootOperator;
    +
    +  public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext
drillbitContext, Foreman foreman) {
    +    this.bee = bee;
    +    this.initiatingClient = initiatingClient;
    +    this.drillbitContext = drillbitContext;
    +    this.foreman = foreman;
    +  }
    +
    +  public WorkerBee getBee() {
    +    return bee;
    +  }
    +
    +  public void setPlanFragments(List<PlanFragment> planFragments) {
    +    this.planFragments = planFragments;
    +  }
    +
    +  public void setRootPlanFragment(PlanFragment rootPlanFragment) {
    +    this.rootPlanFragment = rootPlanFragment;
    +  }
    +
    +  public void setRootOperator(FragmentRoot rootOperator) {
    +    this.rootOperator = rootOperator;
    +  }
    +
    +  /**
    +   * Submits root and non-root fragments fragments for running.
    +   * In case of success move query to the running state.
    +   */
    +  public void submit() {
    +    try {
    +      assert planFragments != null;
    +      assert rootPlanFragment != null;
    +      assert rootOperator != null;
    +
    +      QueryId queryId = foreman.getQueryId();
    +      assert queryId == rootPlanFragment.getHandle().getQueryId();
    +
    +      QueryManager queryManager = foreman.getQueryManager();
    +
    +      try {
    +        drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
    +        drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
    +
    +        logger.debug("Submitting fragments to run.");
    +        // set up the root fragment first so we'll have incoming buffers available.
    +        setupRootFragment(rootPlanFragment, rootOperator);
    +        setupNonRootFragments(planFragments);
    +
    +      } catch (ExecutionSetupException e) {
    +        foreman.moveToState(QueryState.FAILED, e);
    +      }
    +
    +      foreman.moveToState(QueryState.RUNNING, null);
    +      logger.debug("Fragments running.");
    +    } finally {
    +      foreman.startProcessingEvents();
    +    }
    +
    +  }
    +
    +  /**
    +   * Set up the root fragment (which will run locally), and submit it for execution.
    +   *
    +   * @param rootFragment root fragment
    +   * @param rootOperator root operator
    +   * @throws ExecutionSetupException
    +   */
    +  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot
rootOperator) throws ExecutionSetupException {
    +
    +    QueryManager queryManager = foreman.getQueryManager();
    +    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment,
foreman.getQueryContext(),
    +        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
    +    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
    +    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment,
statusReporter, rootOperator);
    +    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment,
rootRunner, statusReporter);
    +
    +    queryManager.addFragmentStatusTracker(rootFragment, true);
    +
    +    // FragmentManager is setting buffer for FragmentContext
    +    if (rootContext.isBuffersDone()) {
    +      // if we don't have to wait for any incoming data, start the fragment runner.
    +      bee.addFragmentRunner(rootRunner);
    +    } else {
    +      // if we do, record the fragment manager in the workBus.
    +      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
    +    }
    +  }
    +
    +
    +  /**
    +   * Set up the non-root fragments for execution. Some may be local, and some may be
remote.
    +   * Messages are sent immediately, so they may start returning data even before we complete
this.
    +   *
    +   * @param fragments the fragments
    +   * @throws ForemanException
    +   */
    +  private void setupNonRootFragments(final Collection<PlanFragment> fragments)
throws ForemanException {
    +    if (fragments.isEmpty()) {
    +      // nothing to do here
    +      return;
    +    }
    +    /*
    +     * We will send a single message to each endpoint, regardless of how many fragments
will be
    +     * executed there. We need to start up the intermediate fragments first so that they
will be
    +     * ready once the leaf fragments start producing data. To satisfy both of these,
we will
    +     * make a pass through the fragments and put them into the remote maps according
to their
    +     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
    +     * fragments which are assigned to run on local Drillbit node (or Foreman node) into
separate lists.
    +     *
    +     * This will help to schedule local
    +     */
    +    final Multimap<CoordinationProtos.DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap
= ArrayListMultimap.create();
    +    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
    +    final Multimap<CoordinationProtos.DrillbitEndpoint, PlanFragment> remoteIntFragmentMap
= ArrayListMultimap.create();
    +    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
    +
    +    final CoordinationProtos.DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
    +    // record all fragments for status purposes.
    +    for (final PlanFragment planFragment : fragments) {
    +
    +      if (logger.isTraceEnabled()) {
    +        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
    +            planFragment.getFragmentJson());
    +      }
    +
    +      foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
    +
    +      if (planFragment.getLeafFragment()) {
    +        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList,
remoteLeafFragmentMap);
    +      } else {
    +        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList,
remoteIntFragmentMap);
    +      }
    +    }
    +
    +    /*
    +     * We need to wait for the intermediates to be sent so that they'll be set up by
the time
    +     * the leaves start producing data. We'll use this latch to wait for the responses.
    +     *
    +     * However, in order not to hang the process if any of the RPC requests fails, we
always
    +     * count down (see FragmentSubmitFailures), but we count the number of failures so
that we'll
    +     * know if any submissions did fail.
    +     */
    +    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
    +
    +    // Setup local intermediate fragments
    +    for (final PlanFragment fragment : localIntFragmentList) {
    +      startLocalFragment(fragment);
    +    }
    +
    +    injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments",
ForemanException.class);
    +    /*
    +     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come
in through
    +     * the regular sendListener event delivery.
    +     */
    +    for (final CoordinationProtos.DrillbitEndpoint ep : remoteLeafFragmentMap.keySet())
{
    +      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
    +    }
    +
    +    // Setup local leaf fragments
    +    for (final PlanFragment fragment : localLeafFragmentList) {
    +      startLocalFragment(fragment);
    +    }
    +  }
    +
    +  /**
    +   * Send all the remote fragments belonging to a single target drillbit in one request.
    +   *
    +   * @param assignment the drillbit assigned to these fragments
    +   * @param fragments the set of fragments
    +   * @param latch the countdown latch used to track the requests to all endpoints
    +   * @param fragmentSubmitFailures the submission failure counter used to track the requests
to all endpoints
    +   */
    +  private void sendRemoteFragments(final CoordinationProtos.DrillbitEndpoint assignment,
final Collection<PlanFragment> fragments,
    +                                   final CountDownLatch latch, final FragmentSubmitFailures
fragmentSubmitFailures) {
    +    @SuppressWarnings("resource")
    +    final Controller controller = drillbitContext.getController();
    +    final BitControl.InitializeFragments.Builder fb = BitControl.InitializeFragments.newBuilder();
    --- End diff --
    
    Similarly, any reason to not just import `InitializeFragments` as in the original code?


> Canceling a query hung in planning state, leaves the query in ENQUEUED state for ever.
> --------------------------------------------------------------------------------------
>
>                 Key: DRILL-5963
>                 URL: https://issues.apache.org/jira/browse/DRILL-5963
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 1.12.0
>         Environment: Drill 1.12.0-SNAPSHOT, commit: 4a718a0bd728ae02b502ac93620d132f0f6e1b6c
>            Reporter: Khurram Faraaz
>            Assignee: Arina Ielchiieva
>            Priority: Critical
>             Fix For: 1.13.0
>
>         Attachments: enqueued-2.png
>
>
> Canceling the below query that is hung in planning state, leaves the query in ENQUEUED
state for ever.
> Here is the query that is hung in planning state
> {noformat}
> 0: jdbc:drill:schema=dfs.tmp> select 1 || ',' || 2 || ',' || 3 || ',' || 4 || ','
|| 5 || ',' || 6 || ',' || 7 || ',' || 8 || ',' || 9 || ',' || 0 || ',' AS CSV_DATA from (values(1));
> +--+
> |  |
> +--+
> +--+
> No rows selected (304.291 seconds)
> {noformat}
> Explain plan for that query also just hangs.
> {noformat}
> explain plan for select 1 || ',' || 2 || ',' || 3 || ',' || 4 || ',' || 5 || ',' || 6
|| ',' || 7 || ',' || 8 || ',' || 9 || ',' || 0 || ',' AS CSV_DATA from (values(1));
> ...
> {noformat}
> The above issues show the following problems:
> *1. Simple query with reasonable number of concat functions hangs.*
> In reality query does not hang it just take lots of time to execute. The root cause is
that during planning time DrillFuncHolderExpr return type is extensively used to determine
matching function, matching type etc. Though this type is retrieved via [getter|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java#L41]
in reality complex logic is executed beaneath it. For example for [concat function|https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/output/ConcatReturnTypeInference.java#L47].
Since function return type can not be changes during DrillFuncHolderExpr life time, it is
safe to cache it.
> *2. No mechanism to cancel query during ENQUEUED state.*
> Currently Drill does not have mechanism to cancel query before STARTING / RUNNING states.
Plus ENQUEUED state includes two PLANNING and ENQUEUED.
> Also submitting mechanism for submitting query to the queue is blocking, making foreman
wait till enqueueing is done Making it non-blocking will prevent consuming threads that just
sit idle in a busy system and also is important when we move to a real admission control solution.
> The following changes were made to address above issues:
> a. two new states were added: PREPARING (when foreman is initialized) and PLANNING (includes
logical and / or physical planning).
> b. process of query enqueuing was made non-blocking. Once query was enqueued, fragments
runner is called to submit fragments locally and remotely.
> c. ability to cancel query during planning and enqueued states was added.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message