Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E987B200D3B for ; Fri, 10 Nov 2017 18:35:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E7CEB160BEE; Fri, 10 Nov 2017 17:35:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DF20D160BE0 for ; Fri, 10 Nov 2017 18:35:26 +0100 (CET) Received: (qmail 25081 invoked by uid 500); 10 Nov 2017 17:35:26 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 25072 invoked by uid 99); 10 Nov 2017 17:35:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Nov 2017 17:35:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 37AE11A47DB for ; Fri, 10 Nov 2017 17:35:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.126 X-Spam-Level: ** X-Spam-Status: No, score=2.126 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 2vOroRBgx6_b for ; Fri, 10 Nov 2017 17:35:23 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id C3F715FCDA for ; Fri, 10 Nov 2017 17:35:22 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id 3403510055D; Fri, 10 Nov 2017 09:35:22 -0800 (PST) Date: Fri, 10 Nov 2017 09:35:22 -0800 From: "Murtadha Hubail (Code Review)" CC: Jenkins , Till Westmann , abdullah alamoudi , Michael Blow , Dmitry Lychagin Reply-To: mhubail@apache.org X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Dea... X-Gerrit-Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77 X-Gerrit-ChangeURL: X-Gerrit-Commit: c90f1e38af3fc59a019a8b551eac43ea8ef3cdd5 In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 Message-Id: <20171110173522.3403510055D@vitalstatistix.ics.uci.edu> archived-at: Fri, 10 Nov 2017 17:35:28 -0000 Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate ...................................................................... [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate - user model changes: no - storage format changes: no - interface changes: no Details: - Currently there is a chance that a MaterializingPipelinedPartition is deallocated before the consuming thread starts (e.g. due to job abort) and therefore the consuming thread will not be interrupted leading to leaked threads and files. This change checks if partition was deallocated before the consumer starts, then the consumer thread cleans up any files then exits. - Make TaskAttemptId non-final class to mock it. - Add test case. Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2143 Sonar-Qube: Jenkins Tested-by: Jenkins Integration-Tests: Jenkins Contrib: Jenkins Reviewed-by: Till Westmann --- M asterixdb/asterix-app/pom.xml M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java 6 files changed, 117 insertions(+), 19 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index b039071..c162e14 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -577,5 +577,10 @@ test-jar test + + org.apache.hyracks + hyracks-comm + ${hyracks.version} + \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java index 03f42f5..d80eabc 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java @@ -189,7 +189,7 @@ return num; } - private static void checkThreadLeaks() throws IOException { + public static void checkThreadLeaks() throws IOException { String threadDump = ThreadDumpUtil.takeDumpJSONString(); // Currently we only do sanity check for threads used in the execution engine. // Later we should check if there are leaked storage threads as well. @@ -200,7 +200,7 @@ } } - private static void checkOpenRunFileLeaks() throws IOException { + public static void checkOpenRunFileLeaks() throws IOException { if (SystemUtils.IS_OS_WINDOWS) { return; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java new file mode 100644 index 0000000..5ee0e9f --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.storage; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.replication.management.NetworkingUtil; +import org.apache.asterix.test.common.TestHelper; +import org.apache.asterix.test.runtime.LangExecutionUtil; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.comm.channels.NetworkOutputChannel; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition; +import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class DeallocatableTest { + + @Before + public void setUp() throws Exception { + TestHelper.deleteExistingInstanceFiles(); + } + + @After + public void tearDown() throws Exception { + TestHelper.deleteExistingInstanceFiles(); + } + + @Test + public void deallocateBeforeConsumerStart() throws Exception { + TestNodeController nc = new TestNodeController(null, false); + try { + nc.init(); + final NodeControllerService ncs = + (NodeControllerService) nc.getAppRuntimeContext().getServiceContext().getControllerService(); + final TaskAttemptId taId = Mockito.mock(TaskAttemptId.class); + final IHyracksTaskContext ctx = nc.createTestContext(true); + final ConnectorDescriptorId codId = new ConnectorDescriptorId(1); + final PartitionId pid = new PartitionId(ctx.getJobletContext().getJobId(), codId, 1, 1); + final ChannelControlBlock ccb = ncs.getNetworkManager() + .connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress())); + final NetworkOutputChannel networkOutputChannel = new NetworkOutputChannel(ccb, 0); + final MaterializingPipelinedPartition mpp = + new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), pid, taId, ncs.getExecutor()); + mpp.open(); + // fill and write frame + final ByteBuffer frame = ctx.allocateFrame(); + while (frame.hasRemaining()) { + frame.put((byte) 0); + } + frame.flip(); + mpp.nextFrame(frame); + // close and deallocate before consumer thread starts + mpp.close(); + mpp.deallocate(); + // start the consumer thread after deallocate + mpp.writeTo(networkOutputChannel); + // give consumer thread chance to exit + TimeUnit.MILLISECONDS.sleep(100); + LangExecutionUtil.checkThreadLeaks(); + LangExecutionUtil.checkOpenRunFileLeaks(); + } finally { + nc.deInit(); + } + } +} diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java index 62c1e4a..c93920f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java @@ -23,12 +23,16 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; +import java.net.SocketAddress; import java.net.SocketException; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.util.Enumeration; + +import org.apache.hyracks.api.comm.NetworkAddress; public class NetworkingUtil { @@ -119,4 +123,8 @@ int port = socketChannel.socket().getPort(); return InetSocketAddress.createUnresolved(hostAddress, port); } + + public static SocketAddress getSocketAddress(NetworkAddress netAddr) throws UnknownHostException { + return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort()); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java index 782561b..bb3b3c8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java @@ -25,7 +25,7 @@ import org.apache.hyracks.api.io.IWritable; -public final class TaskAttemptId implements IWritable, Serializable { +public class TaskAttemptId implements IWritable, Serializable { private static final long serialVersionUID = 1L; private TaskId taskId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java index 5506a94..3582da2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java @@ -38,31 +38,19 @@ private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName()); private final IHyracksTaskContext ctx; - private final Executor executor; - private final IIOManager ioManager; - private final PartitionManager manager; - private final PartitionId pid; - private final TaskAttemptId taId; - private FileReference fRef; - private IFileHandle writeHandle; - private long size; - private boolean eos; - private boolean failed; - protected boolean flushRequest; - + private boolean deallocated; private Level openCloseLevel = Level.FINE; - private Thread dataConsumerThread; public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, @@ -89,6 +77,7 @@ if (dataConsumerThread != null) { dataConsumerThread.interrupt(); } + deallocated = true; } @Override @@ -109,13 +98,18 @@ fRefCopy = fRef; } writer.open(); - IFileHandle readHandle = fRefCopy == null ? null - : ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + IFileHandle readHandle = fRefCopy == null ? null : + ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); try { if (readHandle == null) { // Either fail() is called or close() is called with 0 tuples coming in. return; + } + synchronized (MaterializingPipelinedPartition.this) { + if (deallocated) { + return; + } } long offset = 0; ByteBuffer buffer = ctx.allocateFrame(); @@ -192,6 +186,7 @@ size = 0; eos = false; failed = false; + deallocated = false; manager.registerPartition(pid, taId, this, PartitionState.STARTED, false); } -- To view, visit https://asterix-gerrit.ics.uci.edu/2143 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77 Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Michael Blow Gerrit-Reviewer: Murtadha Hubail Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: abdullah alamoudi