drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-1942) Improve off-heap memory usage tracking
Date Thu, 10 Sep 2015 21:20:46 GMT

    [ https://issues.apache.org/jira/browse/DRILL-1942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739620#comment-14739620
] 

ASF GitHub Bot commented on DRILL-1942:
---------------------------------------

Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/105#discussion_r39214895
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
---
    @@ -0,0 +1,206 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill;
    +
    +import java.io.IOException;
    +import java.util.IdentityHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.Semaphore;
    +
    +import org.apache.drill.QueryTestUtil;
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.util.TestTools;
    +import org.apache.drill.exec.proto.UserBitShared;
    +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
    +import org.apache.drill.exec.rpc.user.UserResultsListener;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TestRule;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.fail;
    +
    +/*
    + * Note that the real interest here is that the drillbit doesn't become
    + * unstable from running a lot of queries concurrently -- it's not about
    + * any particular order of execution. We ignore the results.
    + */
    +public class TestTpchDistributedConcurrent extends BaseTestQuery {
    +  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout
than usual.
    +
    +  /*
    +   * Valid test names taken from TestTpchDistributed. Fuller path prefixes are
    +   * used so that tests may also be taken from other locations -- more variety
    +   * is better as far as this test goes.
    +   */
    +  private final static String queryFile[] = {
    +    "queries/tpch/01.sql",
    +    "queries/tpch/03.sql",
    +    "queries/tpch/04.sql",
    +    "queries/tpch/05.sql",
    +    "queries/tpch/06.sql",
    +    "queries/tpch/07.sql",
    +    "queries/tpch/08.sql",
    +    "queries/tpch/09.sql",
    +    "queries/tpch/10.sql",
    +    "queries/tpch/11.sql",
    +    "queries/tpch/12.sql",
    +    "queries/tpch/13.sql",
    +    "queries/tpch/14.sql",
    +    // "queries/tpch/15.sql", this creates a view
    +    "queries/tpch/16.sql",
    +    "queries/tpch/18.sql",
    +    "queries/tpch/19_1.sql",
    +    "queries/tpch/20.sql",
    +  };
    +
    +  private final static int TOTAL_QUERIES = 115;
    +  private final static int CONCURRENT_QUERIES = 15;
    +
    +  private final static Random random = new Random(0xdeadbeef);
    +  private final static String alterSession = "alter session set `planner.slice_target`
= 10";
    +
    +  private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES;
    +  private final Semaphore completionSemaphore = new Semaphore(0);
    +  private final Semaphore submissionSemaphore = new Semaphore(0);
    +  private final IdentityHashMap<UserResultsListener, Object> listeners = new IdentityHashMap<>();
    +  private Thread testThread = null; // used to interrupt semaphore wait in case of error
    +
    +  private static class FailedQuery {
    +    final String queryFile;
    +    final UserException userEx;
    +
    +    public FailedQuery(final String queryFile, final UserException userEx) {
    +      this.queryFile = queryFile;
    +      this.userEx = userEx;
    +    }
    +  }
    +
    +  private final List<FailedQuery> failedQueries = new LinkedList<>();
    +
    +  private void submitRandomQuery() {
    +    final String filename = queryFile[random.nextInt(queryFile.length)];
    +    final String query;
    +    try {
    +      query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' ');
    +    } catch(IOException e) {
    +      throw new RuntimeException("Caught exception", e);
    +    }
    +    final UserResultsListener listener = new ChainingSilentListener(query);
    +    client.runQuery(UserBitShared.QueryType.SQL, query, listener);
    +    synchronized(this) {
    +      listeners.put(listener, listener);
    +    }
    +  }
    +
    +  private class ChainingSilentListener extends SilentListener {
    +    private final String query;
    +
    +    public ChainingSilentListener(final String query) {
    +      this.query = query;
    +    }
    +
    +    @Override
    +    public void queryCompleted(QueryState state) {
    +      super.queryCompleted(state);
    +
    +      completionSemaphore.release();
    +      synchronized(TestTpchDistributedConcurrent.this) {
    +        final Object object = listeners.remove(this);
    +        assertNotNull("listener not found", object);
    +
    +        /* Only submit more queries if there hasn't been an error. */
    +        if ((failedQueries.size() == 0) && (remainingQueries > 0)) {
    +          /*
    +           * We can't directly submit the query from here, because we're on the RPC
    +           * thread, and it throws an exception if we try to send from here. So we
    +           * allow the QuerySubmitter thread to advance.
    +           */
    +          submissionSemaphore.release();
    +          --remainingQueries;
    +        }
    +      }
    +    }
    +
    +    @Override
    +    public void submissionFailed(UserException uex) {
    +      super.submissionFailed(uex);
    +
    +      completionSemaphore.release();
    +      System.out.println("submissionFailed for " + query + "\nwith " + uex);
    +      synchronized(TestTpchDistributedConcurrent.this) {
    +        final Object object = listeners.remove(this);
    +        assertNotNull("listener not found", object);
    +        failedQueries.add(new FailedQuery(query, uex));
    +        testThread.interrupt();
    +      }
    +    }
    +  }
    +
    +  private class QuerySubmitter extends Thread {
    +    @Override
    +    public void run() {
    +      while(true) {
    +        try {
    +          submissionSemaphore.acquire();
    +        } catch(InterruptedException e) {
    +          System.out.println("QuerySubmitter quitting.");
    +          return;
    +        }
    +
    +        submitRandomQuery();
    +      }
    +    }
    +  }
    +
    +  @Test
    +  public void testConcurrentQueries() throws Exception {
    +    QueryTestUtil.testRunAndPrint(client, UserBitShared.QueryType.SQL, alterSession);
    +
    +    testThread = Thread.currentThread();
    +    final QuerySubmitter querySubmitter = new QuerySubmitter();
    +    querySubmitter.start();
    +
    +    // Kick off the initial queries. As they complete, they will submit more.
    +    submissionSemaphore.release(CONCURRENT_QUERIES);
    +
    +    // Wait for all the queries to complete.
    +    try {
    +      completionSemaphore.acquire(TOTAL_QUERIES);
    +    } catch(InterruptedException e) {
    +      fail("caught " + e);
    --- End diff --
    
    If ```testThread``` is interrupted, this fail will trigger an ```AssertionError("...")```,
however ```querySubmitter``` is not interrupted, and the ```System.err.println("...") ```
calls do not happen, right?


> Improve off-heap memory usage tracking
> --------------------------------------
>
>                 Key: DRILL-1942
>                 URL: https://issues.apache.org/jira/browse/DRILL-1942
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>            Reporter: Chris Westin
>            Assignee: Chris Westin
>             Fix For: 1.2.0
>
>         Attachments: DRILL-1942.1.patch.txt, DRILL-1942.2.patch.txt, DRILL-1942.3.patch.txt
>
>
> We're using a lot more memory than we think we should. We may be leaking it, or not releasing
it as soon as we could. 
> This is a call to come up with some improved tracking so that we can get statistics out
about exactly where we're using it, and whether or not we can release it earlier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message