Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90C0B179B5 for ; Wed, 16 Sep 2015 23:45:56 +0000 (UTC) Received: (qmail 53043 invoked by uid 500); 16 Sep 2015 23:45:34 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 53008 invoked by uid 500); 16 Sep 2015 23:45:34 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 52993 invoked by uid 99); 16 Sep 2015 23:45:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 23:45:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5C4ADE01F5; Wed, 16 Sep 2015 23:45:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: parthc@apache.org To: commits@drill.apache.org Message-Id: <6061730c3ab24e139aa57afd7f8a3b80@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: drill git commit: DRILL-1942-concurrency-test: new smoke test for concurrent query execution; useful to checking on the new allocator's locking schemes, as well as a general concurrent query execution smoke test. this closes #105 Date: Wed, 16 Sep 2015 23:45:34 +0000 (UTC) Repository: drill Updated Branches: refs/heads/master 9afcf61f6 -> 5fab01fab DRILL-1942-concurrency-test: new smoke test for concurrent query execution; useful to checking on the new allocator's locking schemes, as well as a general concurrent query execution smoke test. this closes #105 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5fab01fa Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5fab01fa Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5fab01fa Branch: refs/heads/master Commit: 5fab01fab93c3b51496aec2358645ac2ac78568c Parents: 9afcf61 Author: Chris Westin Authored: Thu Aug 6 17:09:26 2015 -0700 Committer: Parth Chandra Committed: Wed Sep 16 16:41:39 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/drill/BaseTestQuery.java | 2 +- .../drill/TestTpchDistributedConcurrent.java | 212 +++++++++++++++++++ 2 files changed, 213 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5fab01fa/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index cb137ee..9387662 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -424,7 +424,7 @@ public class BaseTestQuery extends ExecTest { } } - private static class SilentListener implements UserResultsListener { + public static class SilentListener implements UserResultsListener { private volatile UserException exception; private final AtomicInteger count = new AtomicInteger(); private final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/drill/blob/5fab01fa/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java new file mode 100644 index 0000000..f6a1f56 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import org.apache.drill.QueryTestUtil; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import com.google.common.collect.Sets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/* + * Note that the real interest here is that the drillbit doesn't become + * unstable from running a lot of queries concurrently -- it's not about + * any particular order of execution. We ignore the results. + */ +public class TestTpchDistributedConcurrent extends BaseTestQuery { + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual. + + /* + * Valid test names taken from TestTpchDistributed. Fuller path prefixes are + * used so that tests may also be taken from other locations -- more variety + * is better as far as this test goes. + */ + private final static String queryFile[] = { + "queries/tpch/01.sql", + "queries/tpch/03.sql", + "queries/tpch/04.sql", + "queries/tpch/05.sql", + "queries/tpch/06.sql", + "queries/tpch/07.sql", + "queries/tpch/08.sql", + "queries/tpch/09.sql", + "queries/tpch/10.sql", + "queries/tpch/11.sql", + "queries/tpch/12.sql", + "queries/tpch/13.sql", + "queries/tpch/14.sql", + // "queries/tpch/15.sql", this creates a view + "queries/tpch/16.sql", + "queries/tpch/18.sql", + "queries/tpch/19_1.sql", + "queries/tpch/20.sql", + }; + + private final static int TOTAL_QUERIES = 115; + private final static int CONCURRENT_QUERIES = 15; + + private final static Random random = new Random(0xdeadbeef); + private final static String alterSession = "alter session set `planner.slice_target` = 10"; + + private int remainingQueries = TOTAL_QUERIES - CONCURRENT_QUERIES; + private final Semaphore completionSemaphore = new Semaphore(0); + private final Semaphore submissionSemaphore = new Semaphore(0); + private final Set listeners = Sets.newIdentityHashSet(); + private Thread testThread = null; // used to interrupt semaphore wait in case of error + + private static class FailedQuery { + final String queryFile; + final UserException userEx; + + public FailedQuery(final String queryFile, final UserException userEx) { + this.queryFile = queryFile; + this.userEx = userEx; + } + } + + private final List failedQueries = new LinkedList<>(); + + private void submitRandomQuery() { + final String filename = queryFile[random.nextInt(queryFile.length)]; + final String query; + try { + query = QueryTestUtil.normalizeQuery(getFile(filename)).replace(';', ' '); + } catch(IOException e) { + throw new RuntimeException("Caught exception", e); + } + final UserResultsListener listener = new ChainingSilentListener(query); + client.runQuery(UserBitShared.QueryType.SQL, query, listener); + synchronized(this) { + listeners.add(listener); + } + } + + private class ChainingSilentListener extends SilentListener { + private final String query; + + public ChainingSilentListener(final String query) { + this.query = query; + } + + @Override + public void queryCompleted(QueryState state) { + super.queryCompleted(state); + + completionSemaphore.release(); + synchronized(TestTpchDistributedConcurrent.this) { + final Object object = listeners.remove(this); + assertNotNull("listener not found", object); + + /* Only submit more queries if there hasn't been an error. */ + if ((failedQueries.size() == 0) && (remainingQueries > 0)) { + /* + * We can't directly submit the query from here, because we're on the RPC + * thread, and it throws an exception if we try to send from here. So we + * allow the QuerySubmitter thread to advance. + */ + submissionSemaphore.release(); + --remainingQueries; + } + } + } + + @Override + public void submissionFailed(UserException uex) { + super.submissionFailed(uex); + + completionSemaphore.release(); + System.out.println("submissionFailed for " + query + "\nwith " + uex); + synchronized(TestTpchDistributedConcurrent.this) { + final Object object = listeners.remove(this); + assertNotNull("listener not found", object); + failedQueries.add(new FailedQuery(query, uex)); + testThread.interrupt(); + } + } + } + + private class QuerySubmitter extends Thread { + @Override + public void run() { + while(true) { + try { + submissionSemaphore.acquire(); + } catch(InterruptedException e) { + System.out.println("QuerySubmitter quitting."); + return; + } + + submitRandomQuery(); + } + } + } + + @Test + public void testConcurrentQueries() throws Exception { + QueryTestUtil.testRunAndPrint(client, UserBitShared.QueryType.SQL, alterSession); + + testThread = Thread.currentThread(); + final QuerySubmitter querySubmitter = new QuerySubmitter(); + querySubmitter.start(); + + // Kick off the initial queries. As they complete, they will submit more. + submissionSemaphore.release(CONCURRENT_QUERIES); + + // Wait for all the queries to complete. + InterruptedException interruptedException = null; + try { + completionSemaphore.acquire(TOTAL_QUERIES); + } catch(InterruptedException e) { + interruptedException = e; + + // List the failed queries. + for(final FailedQuery fq : failedQueries) { + System.err.println(String.format( + "%s failed with %s", fq.queryFile, fq.userEx)); + } + } + + // Stop the querySubmitter thread. + querySubmitter.interrupt(); + + assertNull("Query error caused interruption", interruptedException); + + final int nListeners = listeners.size(); + assertEquals(nListeners + " listeners still exist", 0, nListeners); + + assertEquals("Didn't submit all queries", 0, remainingQueries); + assertEquals("Queries failed", 0, failedQueries.size()); + } +}