asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dl...@apache.org
Subject asterixdb git commit: [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate
Date Fri, 10 Nov 2017 17:49:52 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 39390edc9 -> c90f1e38a


[ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Currently there is a chance that a MaterializingPipelinedPartition
  is deallocated before the consuming thread starts (e.g. due to job
  abort) and therefore the consuming thread will not be interrupted
  leading to leaked threads and files. This change checks if partition
  was deallocated before the consumer starts, then the consumer thread
  cleans up any files then exits.
- Make TaskAttemptId non-final class to mock it.
- Add test case.

Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2143
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c90f1e38
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c90f1e38
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c90f1e38

Branch: refs/heads/master
Commit: c90f1e38af3fc59a019a8b551eac43ea8ef3cdd5
Parents: 39390ed
Author: Murtadha Hubail <mhubail@apache.org>
Authored: Fri Nov 10 18:17:29 2017 +0300
Committer: Murtadha Hubail <mhubail@apache.org>
Committed: Fri Nov 10 09:35:21 2017 -0800

----------------------------------------------------------------------
 asterixdb/asterix-app/pom.xml                   |  5 ++
 .../asterix/test/runtime/LangExecutionUtil.java |  4 +-
 .../asterix/test/storage/DeallocatableTest.java | 90 ++++++++++++++++++++
 .../replication/management/NetworkingUtil.java  |  8 ++
 .../hyracks/api/dataflow/TaskAttemptId.java     |  2 +-
 .../MaterializingPipelinedPartition.java        | 27 +++---
 6 files changed, 117 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/asterixdb/asterix-app/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index b039071..c162e14 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -577,5 +577,10 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-comm</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 03f42f5..d80eabc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -189,7 +189,7 @@ public class LangExecutionUtil {
         return num;
     }
 
-    private static void checkThreadLeaks() throws IOException {
+    public static void checkThreadLeaks() throws IOException {
         String threadDump = ThreadDumpUtil.takeDumpJSONString();
         // Currently we only do sanity check for threads used in the execution engine.
         // Later we should check if there are leaked storage threads as well.
@@ -200,7 +200,7 @@ public class LangExecutionUtil {
         }
     }
 
-    private static void checkOpenRunFileLeaks() throws IOException {
+    public static void checkOpenRunFileLeaks() throws IOException {
         if (SystemUtils.IS_OS_WINDOWS) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
new file mode 100644
index 0000000..5ee0e9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DeallocatableTest {
+
+    @Before
+    public void setUp() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void deallocateBeforeConsumerStart() throws Exception {
+        TestNodeController nc = new TestNodeController(null, false);
+        try {
+            nc.init();
+            final NodeControllerService ncs =
+                    (NodeControllerService) nc.getAppRuntimeContext().getServiceContext().getControllerService();
+            final TaskAttemptId taId = Mockito.mock(TaskAttemptId.class);
+            final IHyracksTaskContext ctx = nc.createTestContext(true);
+            final ConnectorDescriptorId codId = new ConnectorDescriptorId(1);
+            final PartitionId pid = new PartitionId(ctx.getJobletContext().getJobId(), codId,
1, 1);
+            final ChannelControlBlock ccb = ncs.getNetworkManager()
+                    .connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress()));
+            final NetworkOutputChannel networkOutputChannel = new NetworkOutputChannel(ccb,
0);
+            final MaterializingPipelinedPartition mpp =
+                    new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), pid,
taId, ncs.getExecutor());
+            mpp.open();
+            // fill and write frame
+            final ByteBuffer frame = ctx.allocateFrame();
+            while (frame.hasRemaining()) {
+                frame.put((byte) 0);
+            }
+            frame.flip();
+            mpp.nextFrame(frame);
+            // close and deallocate before consumer thread starts
+            mpp.close();
+            mpp.deallocate();
+            // start the consumer thread after deallocate
+            mpp.writeTo(networkOutputChannel);
+            // give consumer thread chance to exit
+            TimeUnit.MILLISECONDS.sleep(100);
+            LangExecutionUtil.checkThreadLeaks();
+            LangExecutionUtil.checkOpenRunFileLeaks();
+        } finally {
+            nc.deInit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 62c1e4a..c93920f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -23,13 +23,17 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.net.SocketAddress;
 import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Enumeration;
 
+import org.apache.hyracks.api.comm.NetworkAddress;
+
 public class NetworkingUtil {
 
     private NetworkingUtil() {
@@ -119,4 +123,8 @@ public class NetworkingUtil {
         int port = socketChannel.socket().getPort();
         return InetSocketAddress.createUnresolved(hostAddress, port);
     }
+
+    public static SocketAddress getSocketAddress(NetworkAddress netAddr) throws UnknownHostException
{
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()),
netAddr.getPort());
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
index 782561b..bb3b3c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
 
 import org.apache.hyracks.api.io.IWritable;
 
-public final class TaskAttemptId implements IWritable, Serializable {
+public class TaskAttemptId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
     private TaskId taskId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c90f1e38/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 5506a94..3582da2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -38,31 +38,19 @@ public class MaterializingPipelinedPartition implements IFrameWriter,
IPartition
     private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
 
     private final IHyracksTaskContext ctx;
-
     private final Executor executor;
-
     private final IIOManager ioManager;
-
     private final PartitionManager manager;
-
     private final PartitionId pid;
-
     private final TaskAttemptId taId;
-
     private FileReference fRef;
-
     private IFileHandle writeHandle;
-
     private long size;
-
     private boolean eos;
-
     private boolean failed;
-
     protected boolean flushRequest;
-
+    private boolean deallocated;
     private Level openCloseLevel = Level.FINE;
-
     private Thread dataConsumerThread;
 
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager,
PartitionId pid,
@@ -89,6 +77,7 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
         if (dataConsumerThread != null) {
             dataConsumerThread.interrupt();
         }
+        deallocated = true;
     }
 
     @Override
@@ -109,14 +98,19 @@ public class MaterializingPipelinedPartition implements IFrameWriter,
IPartition
                         fRefCopy = fRef;
                     }
                     writer.open();
-                    IFileHandle readHandle = fRefCopy == null ? null
-                            : ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
-                                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    IFileHandle readHandle = fRefCopy == null ? null :
+                            ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
+                                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         if (readHandle == null) {
                             // Either fail() is called or close() is called with 0 tuples
coming in.
                             return;
                         }
+                        synchronized (MaterializingPipelinedPartition.this) {
+                            if (deallocated) {
+                                return;
+                            }
+                        }
                         long offset = 0;
                         ByteBuffer buffer = ctx.allocateFrame();
                         boolean done = false;
@@ -192,6 +186,7 @@ public class MaterializingPipelinedPartition implements IFrameWriter,
IPartition
         size = 0;
         eos = false;
         failed = false;
+        deallocated = false;
         manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
     }
 


Mime
View raw message