drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vrozov <...@git.apache.org>
Subject [GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...
Date Thu, 07 Dec 2017 05:29:20 GMT
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1041#discussion_r155433860
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
---
    @@ -74,83 +70,42 @@ public void statusUpdate(final FragmentStatus status) {
     
       public void addFragmentManager(final FragmentManager fragmentManager) {
         if (logger.isDebugEnabled()) {
    -      logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
    +      logger.debug("Fragment {} manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()),
fragmentManager);
         }
         final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
    -      if (old != null) {
    -        throw new IllegalStateException(
    -            "Tried to set fragment manager when has already been set for the provided
fragment handle.");
    -    }
    -  }
    -
    -  public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
    -    synchronized (this) {
    -      return managers.get(handle);
    +    if (old != null) {
    +      throw new IllegalStateException(
    +          String.format("Manager {} for fragment {} already exists.", old, QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())));
         }
       }
     
    -  public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException
{
    -    synchronized (this) {
    -      // Check if this was a recently finished (completed or cancelled) fragment.  If
so, throw away message.
    -      if (recentlyFinishedFragments.asMap().containsKey(handle)) {
    -        if (logger.isDebugEnabled()) {
    -          logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
    -        }
    -        return null;
    -      }
    -
    -      // since non-leaf fragments are sent first, it is an error condition if the manager
is unavailable.
    -      final FragmentManager m = managers.get(handle);
    -      if (m != null) {
    -        return m;
    -      }
    -    }
    -    throw new FragmentSetupException("Failed to receive plan fragment that was required
for id: "
    -        + QueryIdHelper.getQueryIdentifier(handle));
    +  public FragmentManager getFragmentManager(final FragmentHandle handle) {
    +    return managers.get(handle);
       }
     
       /**
    -   * Removes fragment manager (for the corresponding the handle) from the work event
bus. This method can be called
    -   * multiple times. The manager will be removed only once (the first call).
    -   * @param handle the handle to the fragment
    -   */
    -  public void removeFragmentManager(final FragmentHandle handle) {
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager != null) {
    -        recentlyFinishedFragments.put(handle, 1);
    -        managers.remove(handle);
    -      } else {
    -        logger.warn("Fragment {} not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Cancels and removes fragment manager (for the corresponding the handle) from the
work event bus, Currently, used
    -   * for fragments waiting on data (root and intermediate).
    +   * Optionally cancels and removes fragment manager (for the corresponding the handle)
from the work event bus. Currently, used
    +   * for fragments waiting on data (root and intermediate). This method can be called
multiple times. The manager will be removed
    +   * only once (the first call).
        * @param handle the handle to the fragment
    +   * @param cancel
        * @return if the fragment was found and removed from the event bus
        */
    -  public boolean cancelAndRemoveFragmentManagerIfExists(final FragmentHandle handle)
{
    -    if (logger.isDebugEnabled()) {
    -      logger.debug("Cancelling and removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
    -    }
    -
    -    synchronized (this) {
    -      final FragmentManager manager = managers.get(handle);
    -      if (manager == null) {
    -        return false;
    +  public boolean removeFragmentManager(final FragmentHandle handle, final boolean cancel)
{
    +    final FragmentManager manager = managers.remove(handle);
    +    if (manager != null) {
    +      assert !manager.isCancelled() : String.format("Fragment {} manager {} is already
cancelled.", QueryIdHelper.getQueryIdentifier(handle), manager);
    +      if (cancel) {
    +        manager.cancel();
    +      }
    +      if (logger.isDebugEnabled()) {
    +        logger.debug("{} fragment {} manager {} from the work bus.", cancel ? "Cancel
and removed" : "Removed",
    +            QueryIdHelper.getQueryIdentifier(handle), manager);
           }
    -
    -      manager.cancel();
    -      recentlyFinishedFragments.put(handle, 1);
    -      managers.remove(handle);
           return true;
    +    } else if (logger.isWarnEnabled()) {
    +      logger.warn("Fragment {} manager is not found in the work bus.", QueryIdHelper.getQueryIdentifier(handle));
         }
    +    return false;
    --- End diff --
    
    ControlMessageHandler checks the result on line 196.


---

Mime
View raw message