drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [14/16] incubator-drill git commit: Update WorkEventBus to immediately fail if Fragment is unavailable. No need to wait now that we propogate intermediate fragments before leaf fragments.
Date Sat, 08 Nov 2014 00:03:15 GMT
Update WorkEventBus to immediately fail if Fragment is unavailable.  No need to wait now that
we propogate intermediate fragments before leaf fragments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/797b1bcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/797b1bcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/797b1bcf

Branch: refs/heads/master
Commit: 797b1bcffbcda4dd5e7c3be1bb3ba1e9593cff16
Parents: 06f0e17
Author: Jacques Nadeau <jacques@apache.org>
Authored: Thu Nov 6 20:43:32 2014 -0800
Committer: Jinfeng Ni <jni@maprtech.com>
Committed: Fri Nov 7 10:50:57 2014 -0800

----------------------------------------------------------------------
 .../drill/exec/rpc/control/WorkEventBus.java    | 49 +++-----------------
 1 file changed, 7 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/797b1bcf/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 23380ff..eae7b5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -17,25 +17,19 @@
  */
 package org.apache.drill.exec.rpc.control;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.exec.cache.DistributedMap;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.FragmentStatusListener;
 import org.apache.drill.exec.work.fragment.FragmentManager;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.RootFragmentManager;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -48,8 +42,9 @@ public class WorkEventBus {
   private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId,
FragmentStatusListener>(
       16, 0.75f, 16);
   private final WorkerBee bee;
-  private final Cache<FragmentHandle,Void> cancelledFragments = CacheBuilder.newBuilder()
+  private final Cache<FragmentHandle,Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
           .maximumSize(10000)
+
           .expireAfterWrite(10, TimeUnit.MINUTES)
           .build();
 
@@ -85,14 +80,10 @@ public class WorkEventBus {
 
   public void setFragmentManager(FragmentManager fragmentManager) {
     logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
-
-    synchronized (managers) {
-      FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
-      managers.notifyAll();
+    FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
       if (old != null) {
         throw new IllegalStateException(
             "Tried to set fragment manager when has already been set for the provided fragment
handle.");
-      }
     }
   }
 
@@ -102,54 +93,28 @@ public class WorkEventBus {
   }
 
   public FragmentManager getFragmentManager(FragmentHandle handle) throws FragmentSetupException
{
-
     // check if this was a recently canceled fragment.  If so, throw away message.
-    if (cancelledFragments.asMap().containsKey(handle)) {
+    if (recentlyFinishedFragments.asMap().containsKey(handle)) {
       logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
       return null;
     }
 
-    // chm manages concurrency better then everyone fighting for the same lock so we'll do
a double check.
+    // since non-leaf fragments are sent first, it is an error condition if the manager is
unavailable.
     FragmentManager m = managers.get(handle);
     if(m != null){
       return m;
     }
-
-    logger.debug("Fragment was requested but no manager exists.  Waiting for manager for
fragment: {}", QueryIdHelper.getQueryIdentifier(handle));
-    try{
-    // We need to handle the race condition between the fragments being sent to leaf nodes
and intermediate nodes.  It is possible that a leaf node would send a data batch to a intermediate
node before the intermediate node received the associated plan.  As such, we will wait here
for a bit to see if the appropriate fragment shows up.
-    long expire = System.currentTimeMillis() + 30*1000;
-    synchronized(managers){
-
-      // we loop because we may be woken up by some other, unrelated manager insertion.
-      while(true){
-        m = managers.get(handle);
-        if(m != null) {
-          return m;
-        }
-        long timeToWait = expire - System.currentTimeMillis();
-        if(timeToWait <= 0){
-          break;
-        }
-
-        managers.wait(timeToWait);
-      }
-
-      throw new FragmentSetupException("Failed to receive plan fragment that was required
for id: " + QueryIdHelper.getQueryIdentifier(handle));
-    }
-    }catch(InterruptedException e){
-      throw new FragmentSetupException("Interrupted while waiting to receive plan fragment..");
-    }
+    throw new FragmentSetupException("Failed to receive plan fragment that was required for
id: " + QueryIdHelper.getQueryIdentifier(handle));
   }
 
   public void cancelFragment(FragmentHandle handle) {
     logger.debug("Fragment canceled: {}", QueryIdHelper.getQueryIdentifier(handle));
-    cancelledFragments.put(handle, null);
     removeFragmentManager(handle);
   }
 
   public void removeFragmentManager(FragmentHandle handle) {
     logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
+    recentlyFinishedFragments.put(handle,  1);
     managers.remove(handle);
   }
 


Mime
View raw message