phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [04/42] phoenix git commit: Memory Leak in RenewLeaseTask
Date Fri, 23 Dec 2016 01:27:38 GMT
Memory Leak in RenewLeaseTask


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/96b6554f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/96b6554f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/96b6554f

Branch: refs/heads/encodecolumns2
Commit: 96b6554f1572d2134fcae639e0c15c99590d7ab1
Parents: 1704545
Author: Samarth <samarth.jain@salesforce.com>
Authored: Mon Dec 12 12:01:14 2016 -0800
Committer: Samarth <samarth.jain@salesforce.com>
Committed: Mon Dec 12 12:01:14 2016 -0800

----------------------------------------------------------------------
 .../query/ConnectionQueryServicesImpl.java      | 33 +++++++++++++++-----
 1 file changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/96b6554f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1203c81..9bc088d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3867,6 +3867,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
         private void waitForRandomDuration() throws InterruptedException {
             new CountDownLatch(1).await(random.nextInt(MAX_WAIT_TIME), MILLISECONDS);
         }
+        
+        private static class InternalRenewLeaseTaskException extends Exception {
+            public InternalRenewLeaseTaskException(String msg) {
+                super(msg);
+            }
+        }
 
         @Override
         public void run() {
@@ -3888,7 +3894,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                     WeakReference<PhoenixConnection> connRef =
                             connectionsQueue.poll(1, TimeUnit.MILLISECONDS);
                     if (connRef == null) {
-                        throw new IllegalStateException(
+                        throw new InternalRenewLeaseTaskException(
                                 "Connection ref found to be null. This is a bug. Some other
thread removed items from the connection queue.");
                     }
                     PhoenixConnection conn = connRef.get();
@@ -3907,7 +3913,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                             WeakReference<TableResultIterator> ref =
                                     scannerQueue.poll(1, TimeUnit.MILLISECONDS);
                             if (ref == null) {
-                                throw new IllegalStateException(
+                                throw new InternalRenewLeaseTaskException(
                                         "TableResulIterator ref found to be null. This is
a bug. Some other thread removed items from the scanner queue.");
                             }
                             TableResultIterator scanningItr = ref.get();
@@ -3944,13 +3950,24 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                     }
                     numConnections--;
                 }
-            } catch (InterruptedException e1) {
+            } catch (InternalRenewLeaseTaskException e) {
+                logger.error("Exception thrown when renewing lease. Draining the queue of
scanners ", e);
+                // clear up the queue since the task is about to be unscheduled.
+                connectionsQueue.clear();
+                // throw an exception since we want the task execution to be suppressed because
we just encountered an
+                // exception that happened because of a bug.
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
                 Thread.currentThread().interrupt(); // restore the interrupt status
-                logger.warn("Thread interrupted when renewing lease ", e1);
-                throw new RuntimeException(e1);
-            } catch (Exception e2) {
-                logger.warn("Exception thrown when renewing lease ", e2);
-                throw new RuntimeException(e2);
+                logger.error("Thread interrupted when renewing lease.", e);
+            } catch (Exception e) {
+                logger.error("Exception thrown when renewing lease ", e);
+                // don't drain the queue and swallow the exception in this case since we
don't want the task
+                // execution to be suppressed because renewing lease of a scanner failed.
+            } catch (Throwable e) {
+                logger.error("Exception thrown when renewing lease. Draining the queue of
scanners ", e);
+                connectionsQueue.clear(); // clear up the queue since the task is about to
be unscheduled.
+                throw new RuntimeException(e);
             }
         }
     }


Mime
View raw message