drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject drill git commit: DRILL-3147: tpcds-sf1-parquet query 73 causes memory leak
Date Thu, 25 Jun 2015 19:54:59 GMT
Repository: drill
Updated Branches:
  refs/heads/master c1b847acd -> 403dc5cdc


DRILL-3147: tpcds-sf1-parquet query 73 causes memory leak

- each time a fragment A sends a "receiver finished" to fragment B, fragment B id will be
  added to FragmentContext.ignoredSenders list
- refactored UnorderedReceiverBatch.informSenders() and MergingRecordBatch.informSenders()
  by moving this method to FragmentContext
- DataServer.send() uses FragmentContext.ignoredSenders to decide if a batch should be
  passed to the fragment or discarded right away
- BaseRawBatchBuffer methods enqueue() and kill() are now synchronized
- TestTpcdsSf1Leak test reproduces the leak, it's ignored by default because it requires
  a large dataset


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/403dc5cd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/403dc5cd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/403dc5cd

Branch: refs/heads/master
Commit: 403dc5cdc6d34b24c1caca7cb574eb9e9727afe4
Parents: c1b847a
Author: adeneche <adeneche@gmail.com>
Authored: Mon May 18 18:02:12 2015 -0700
Committer: Jason Altekruse <altekrusejason@gmail.com>
Committed: Thu Jun 25 12:01:00 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/rpc/data/DataServer.java  | 41 +++--------
 .../exec/work/batch/BaseRawBatchBuffer.java     | 11 ++-
 .../exec/work/fragment/RootFragmentManager.java |  2 +-
 .../java/org/apache/drill/BaseTestQuery.java    | 14 ++--
 .../drill/exec/server/TestTpcdsSf1Leaks.java    | 75 ++++++++++++++++++++
 .../src/test/resources/tpcds-sf1/q73.sql        | 27 +++++++
 6 files changed, 129 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 80d2d6e..4908c18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -122,7 +122,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
     final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
     final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
 
-    Pointer<DrillBuf> out = new Pointer<DrillBuf>();
     AckSender ack = new AckSender(sender);
     // increment so we don't get false returns.
     ack.increment();
@@ -139,12 +138,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
         }
 
       }else{
-        if (targetCount > 1) {
-          for (int minor = 0; minor < targetCount; minor++) {
-            send(fragmentBatch, (DrillBuf) body, minor, ack, true);
-          }
-        } else {
-          send(fragmentBatch, (DrillBuf) body, 0, ack, false);
+        for (int minor = 0; minor < targetCount; minor++) {
+          send(fragmentBatch, (DrillBuf) body, minor, ack);
         }
       }
     } catch (IOException | FragmentSetupException e) {
@@ -158,34 +153,23 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
 
       // decrement the extra reference we grabbed at the top.
       ack.sendOk();
-      if(out != null && out.value != null){
-        out.value.release();
-      }
     }
   }
 
-  private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int
minor, final AckSender ack,
-      final boolean shared)
+  private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int
minor, final AckSender ack)
       throws FragmentSetupException, IOException {
 
-    FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+    final FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
     if (manager == null) {
       return;
     }
 
     final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-    final Pointer<DrillBuf> out = new Pointer<DrillBuf>();
+    final Pointer<DrillBuf> out = new Pointer<>();
 
     final boolean withinMemoryEnvelope;
-    final DrillBuf submitBody;
-
-    if (shared) {
-      withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
-      submitBody = out.value;
-    }else{
-      withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body.unwrap());
-      submitBody = body;
-    }
+
+    withinMemoryEnvelope = allocator.takeOwnership(body, out);
 
     if (!withinMemoryEnvelope) {
       // if we over reserved, we need to add poison pill before batch.
@@ -193,14 +177,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection>
{
     }
 
     ack.increment();
-    dataHandler.handle(manager, fragmentBatch, submitBody, ack);
-
-    if (shared) {
-      // make sure to release the reference count we have to the new buffer.
-      // dataHandler.handle should have taken any ownership it needed.
-      out.value.release();
-    }
+    dataHandler.handle(manager, fragmentBatch, out.value, ack);
 
+    // make sure to release the reference count we have to the new buffer.
+    // dataHandler.handle should have taken any ownership it needed.
+    out.value.release();
   }
 
   private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {

http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 11b6cc8..fbffd87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -26,9 +26,9 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
 public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
 
-  private static enum BufferState {
+  private enum BufferState {
     INIT,
     STREAMS_FINISHED,
     KILLED
@@ -61,7 +61,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
   }
 
   @Override
-  public void enqueue(final RawFragmentBatch batch) throws IOException {
+  public synchronized void enqueue(final RawFragmentBatch batch) throws IOException {
 
     // if this fragment is already canceled or failed, we shouldn't need any or more stuff.
We do the null check to
     // ensure that tests run.
@@ -113,8 +113,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
   public void close() {
     if (!isTerminated() && context.shouldContinue()) {
       final String msg = String.format("Cleanup before finished. %d out of %d strams have
finished", completedStreams(), fragmentCount);
-      final IllegalStateException e = new IllegalStateException(msg);
-      throw e;
+      throw  new IllegalStateException(msg);
     }
 
     if (!bufferQueue.isEmpty()) {
@@ -127,7 +126,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer
{
   }
 
   @Override
-  public void kill(final FragmentContext context) {
+  public synchronized void kill(final FragmentContext context) {
     state = BufferState.KILLED;
     clearBufferWithBody();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index b770a33..f4f76dd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 // TODO a lot of this is the same as NonRootFragmentManager
 public class RootFragmentManager implements FragmentManager {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 
   private final IncomingBuffers buffers;
   private final FragmentExecutor runner;

http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 3d09d6a..46186df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.URL;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -47,11 +46,8 @@ import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.util.TestUtilities;
 import org.apache.drill.exec.util.VectorUtil;
-import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.rules.TestRule;
@@ -65,6 +61,7 @@ import com.google.common.io.Resources;
 import static org.hamcrest.core.StringContains.containsString;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class BaseTestQuery extends ExecTest {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
@@ -393,6 +390,15 @@ public class BaseTestQuery extends ExecTest {
     return dir.getAbsolutePath() + File.separator + dirName;
   }
 
+
+  protected static void setSessionOption(final String option, final String value) {
+    try {
+      runSQL(String.format("alter session set `%s` = %s", option, value));
+    } catch(final Exception e) {
+      fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value,
e.toString()));
+    }
+  }
+
   private static class SilentListener implements UserResultsListener {
     private volatile UserException exception;
     private AtomicInteger count = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
new file mode 100644
index 0000000..ba19e0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server;
+
+import org.apache.drill.BaseTestQuery;
+
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static org.junit.Assert.fail;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+/**
+ * To run this unit class you need to download the following data file:
+ * http://apache-drill.s3.amazonaws.com/files/tpcds-sf1-parquet.tgz
+ * and untar it in a some folder (e.g. /tpcds-sf1-parquet) then add the following workspace
to
+ * exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+ *
+ * ,"tpcds" : {
+ *   location: "/tpcds-sf1-parquet",
+ *   writable: false
+ * }
+ *
+ */
+@Ignore
+public class TestTpcdsSf1Leaks extends BaseTestQuery {
+
+  @Rule
+  final public TestRule TIMEOUT = new Timeout(0); // wait forever
+
+  @BeforeClass
+  public static void initCluster() {
+    updateTestCluster(3, null);
+  }
+
+  @Test
+  public void test() throws Exception {
+    setSessionOption(SLICE_TARGET, "10");
+    try {
+      final String query = getFile("tpcds-sf1/q73.sql");
+      for (int i = 0; i < 20; i++) {
+        System.out.printf("%nRun #%d%n", i+1);
+
+        try {
+          runSQL(query);
+        } catch (final Exception e) {
+          fail("query failed: " + e.getMessage());
+        }
+      }
+    }finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
new file mode 100644
index 0000000..094ca2b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql
@@ -0,0 +1,27 @@
+select c.c_last_name,
+  c.c_first_name,
+  c.c_salutation,
+  c.c_preferred_cust_flag,
+  dj.sstn,
+  dj.cnt
+from (
+  select ss.ss_ticket_number as sstn, ss.ss_customer_sk as sscsk, count(*) cnt
+  from dfs_test.tpcds.store_sales as ss,
+    dfs_test.tpcds.date_dim as d,
+    dfs_test.tpcds.store as s,
+    dfs_test.tpcds.household_demographics as hd
+  where ss.ss_sold_date_sk = d.d_date_sk
+    and ss.ss_store_sk = s.s_store_sk
+    and ss.ss_hdemo_sk = hd.hd_demo_sk
+    and (hd.hd_buy_potential = '>10000' or hd.hd_buy_potential = 'unknown')
+    and hd.hd_vehicle_count > 0
+    and case when hd.hd_vehicle_count > 0 then hd.hd_dep_count / hd.hd_vehicle_count else
null end > 1
+    and s.s_county in ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County')
+    and ss.ss_sold_date_sk between 2451180 and 2451269
+  group by ss.ss_ticket_number, ss.ss_customer_sk
+) dj,
+  dfs_test.tpcds.customer as c
+where dj.sscsk = c.c_customer_sk
+  and dj.cnt between 1 and 5
+order by dj.cnt desc
+limit 1000
\ No newline at end of file


Mime
View raw message