asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Murtadha Hubail (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Dea...
Date Fri, 10 Nov 2017 17:35:22 GMT
Murtadha Hubail has submitted this change and it was merged.

Change subject: [ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate
......................................................................


[ASTERIXDB-1871][ASTERIXDB-2095] Stop Consumer Thread on Deallocate

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Currently there is a chance that a MaterializingPipelinedPartition
  is deallocated before the consuming thread starts (e.g. due to job
  abort) and therefore the consuming thread will not be interrupted
  leading to leaked threads and files. This change checks if partition
  was deallocated before the consumer starts, then the consumer thread
  cleans up any files then exits.
- Make TaskAttemptId non-final class to mock it.
- Add test case.

Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2143
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
---
M asterixdb/asterix-app/pom.xml
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
6 files changed, 117 insertions(+), 19 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Till Westmann: Looks good to me, approved
  Jenkins: Verified; No violations found; ; Verified



diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index b039071..c162e14 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -577,5 +577,10 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-comm</artifactId>
+      <version>${hyracks.version}</version>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 03f42f5..d80eabc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -189,7 +189,7 @@
         return num;
     }
 
-    private static void checkThreadLeaks() throws IOException {
+    public static void checkThreadLeaks() throws IOException {
         String threadDump = ThreadDumpUtil.takeDumpJSONString();
         // Currently we only do sanity check for threads used in the execution engine.
         // Later we should check if there are leaked storage threads as well.
@@ -200,7 +200,7 @@
         }
     }
 
-    private static void checkOpenRunFileLeaks() throws IOException {
+    public static void checkOpenRunFileLeaks() throws IOException {
         if (SystemUtils.IS_OS_WINDOWS) {
             return;
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
new file mode 100644
index 0000000..5ee0e9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DeallocatableTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.comm.channels.NetworkOutputChannel;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DeallocatableTest {
+
+    @Before
+    public void setUp() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void deallocateBeforeConsumerStart() throws Exception {
+        TestNodeController nc = new TestNodeController(null, false);
+        try {
+            nc.init();
+            final NodeControllerService ncs =
+                    (NodeControllerService) nc.getAppRuntimeContext().getServiceContext().getControllerService();
+            final TaskAttemptId taId = Mockito.mock(TaskAttemptId.class);
+            final IHyracksTaskContext ctx = nc.createTestContext(true);
+            final ConnectorDescriptorId codId = new ConnectorDescriptorId(1);
+            final PartitionId pid = new PartitionId(ctx.getJobletContext().getJobId(), codId,
1, 1);
+            final ChannelControlBlock ccb = ncs.getNetworkManager()
+                    .connect(NetworkingUtil.getSocketAddress(ncs.getNetworkManager().getLocalNetworkAddress()));
+            final NetworkOutputChannel networkOutputChannel = new NetworkOutputChannel(ccb,
0);
+            final MaterializingPipelinedPartition mpp =
+                    new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), pid,
taId, ncs.getExecutor());
+            mpp.open();
+            // fill and write frame
+            final ByteBuffer frame = ctx.allocateFrame();
+            while (frame.hasRemaining()) {
+                frame.put((byte) 0);
+            }
+            frame.flip();
+            mpp.nextFrame(frame);
+            // close and deallocate before consumer thread starts
+            mpp.close();
+            mpp.deallocate();
+            // start the consumer thread after deallocate
+            mpp.writeTo(networkOutputChannel);
+            // give consumer thread chance to exit
+            TimeUnit.MILLISECONDS.sleep(100);
+            LangExecutionUtil.checkThreadLeaks();
+            LangExecutionUtil.checkOpenRunFileLeaks();
+        } finally {
+            nc.deInit();
+        }
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 62c1e4a..c93920f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -23,12 +23,16 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
+import java.net.SocketAddress;
 import java.net.SocketException;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Enumeration;
+
+import org.apache.hyracks.api.comm.NetworkAddress;
 
 public class NetworkingUtil {
 
@@ -119,4 +123,8 @@
         int port = socketChannel.socket().getPort();
         return InetSocketAddress.createUnresolved(hostAddress, port);
     }
+
+    public static SocketAddress getSocketAddress(NetworkAddress netAddr) throws UnknownHostException
{
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()),
netAddr.getPort());
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
index 782561b..bb3b3c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TaskAttemptId.java
@@ -25,7 +25,7 @@
 
 import org.apache.hyracks.api.io.IWritable;
 
-public final class TaskAttemptId implements IWritable, Serializable {
+public class TaskAttemptId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
     private TaskId taskId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 5506a94..3582da2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -38,31 +38,19 @@
     private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
 
     private final IHyracksTaskContext ctx;
-
     private final Executor executor;
-
     private final IIOManager ioManager;
-
     private final PartitionManager manager;
-
     private final PartitionId pid;
-
     private final TaskAttemptId taId;
-
     private FileReference fRef;
-
     private IFileHandle writeHandle;
-
     private long size;
-
     private boolean eos;
-
     private boolean failed;
-
     protected boolean flushRequest;
-
+    private boolean deallocated;
     private Level openCloseLevel = Level.FINE;
-
     private Thread dataConsumerThread;
 
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager,
PartitionId pid,
@@ -89,6 +77,7 @@
         if (dataConsumerThread != null) {
             dataConsumerThread.interrupt();
         }
+        deallocated = true;
     }
 
     @Override
@@ -109,13 +98,18 @@
                         fRefCopy = fRef;
                     }
                     writer.open();
-                    IFileHandle readHandle = fRefCopy == null ? null
-                            : ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
-                                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    IFileHandle readHandle = fRefCopy == null ? null :
+                            ioManager.open(fRefCopy, IIOManager.FileReadWriteMode.READ_ONLY,
+                                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         if (readHandle == null) {
                             // Either fail() is called or close() is called with 0 tuples
coming in.
                             return;
+                        }
+                        synchronized (MaterializingPipelinedPartition.this) {
+                            if (deallocated) {
+                                return;
+                            }
                         }
                         long offset = 0;
                         ByteBuffer buffer = ctx.allocateFrame();
@@ -192,6 +186,7 @@
         size = 0;
         eos = false;
         failed = false;
+        deallocated = false;
         manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2143
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I18c9fb085c149f41a202fff83aa6ec3aaeba6a77
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhubail@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mblow@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhubail@apache.org>
Gerrit-Reviewer: Till Westmann <tillw@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message