Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 124B82009C6 for ; Mon, 2 May 2016 03:19:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 05E2A1609AD; Mon, 2 May 2016 03:19:59 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4C40316098E for ; Mon, 2 May 2016 03:19:58 +0200 (CEST) Received: (qmail 70679 invoked by uid 500); 2 May 2016 01:19:57 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 70668 invoked by uid 99); 2 May 2016 01:19:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 May 2016 01:19:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47A40DFB73; Mon, 2 May 2016 01:19:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hashutosh@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V) Date: Mon, 2 May 2016 01:19:57 +0000 (UTC) archived-at: Mon, 02 May 2016 01:19:59 -0000 Repository: hive Updated Branches: refs/heads/master b4b821e0a -> 13bc529f4 HIVE-13512 : Make initializing dag ids in TezWork thread safe for parallel compilation (Peter Slawski via Gopal V) Signed-off-by: Ashutosh Chauhan Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/13bc529f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/13bc529f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/13bc529f Branch: refs/heads/master Commit: 13bc529f44318bf4cfe97c2391dca3d461dc9ec7 Parents: b4b821e Author: Peter Slawski Authored: Wed Apr 13 19:54:00 2016 -0800 Committer: Ashutosh Chauhan Committed: Sun May 1 18:17:00 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/plan/TezWork.java | 9 ++- .../hive/ql/plan/TestTezWorkConcurrency.java | 65 ++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java index c6ef829..7a70e6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -69,7 +70,7 @@ public class TezWork extends AbstractOperatorDesc { private static transient final Logger LOG = LoggerFactory.getLogger(TezWork.class); - private static int counter; + private static final AtomicInteger counter = new AtomicInteger(1); private final String dagId; private final String queryName; private final Set roots = new HashSet(); @@ -80,8 +81,12 @@ public class TezWork extends AbstractOperatorDesc { new HashMap, TezEdgeProperty>(); private final Map workVertexTypeMap = new HashMap(); + public TezWork(String queryId) { + this(queryId, null); + } + public TezWork(String queryId, Configuration conf) { - this.dagId = queryId + ":" + (++counter); + this.dagId = queryId + ":" + counter.getAndIncrement(); String queryName = (conf != null) ? DagUtils.getUserSpecifiedDagName(conf) : null; if (queryName == null) { queryName = this.dagId; http://git-wip-us.apache.org/repos/asf/hive/blob/13bc529f/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java new file mode 100644 index 0000000..c59fd10 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWorkConcurrency.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.hive.ql.plan; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; + +public final class TestTezWorkConcurrency { + + @Test + public void ensureDagIdIsUnique() throws Exception { + final int threadCount = 5; + final CountDownLatch threadReadyToStartSignal = new CountDownLatch(threadCount); + final CountDownLatch startThreadSignal = new CountDownLatch(1); + final int numberOfTezWorkToCreatePerThread = 100; + + List>> tasks = Lists.newArrayList(); + for (int i = 0; i < threadCount; i++) { + tasks.add(new FutureTask<>(new Callable>() { + @Override + public Set call() throws Exception { + threadReadyToStartSignal.countDown(); + startThreadSignal.await(); + return generateTezWorkDagIds(numberOfTezWorkToCreatePerThread); + } + })); + } + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (FutureTask> task : tasks) { + executor.execute(task); + } + threadReadyToStartSignal.await(); + startThreadSignal.countDown(); + Set allTezWorkDagIds = getAllTezWorkDagIds(tasks); + assertEquals(threadCount * numberOfTezWorkToCreatePerThread, allTezWorkDagIds.size()); + } + + private static Set generateTezWorkDagIds(int numberOfNames) { + Set tezWorkIds = Sets.newHashSet(); + for (int i = 0; i < numberOfNames; i++) { + TezWork work = new TezWork("query-id"); + tezWorkIds.add(work.getDagId()); + } + return tezWorkIds; + } + + private static Set getAllTezWorkDagIds(List>> tasks) + throws ExecutionException, InterruptedException { + Set allTezWorkDagIds = Sets.newHashSet(); + for (FutureTask> task : tasks) { + allTezWorkDagIds.addAll(task.get()); + } + return allTezWorkDagIds; + } +}