asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities
Date Thu, 27 Jul 2017 06:35:55 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master d57d81ff9 -> a85f4121f


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index dde3bad..fa6580e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -45,7 +45,7 @@ public class NodeManagerTest {
     @Test
     public void testNormal() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
         NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
@@ -70,7 +70,7 @@ public class NodeManagerTest {
     @Test
     public void testException() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
         NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
@@ -89,7 +89,7 @@ public class NodeManagerTest {
     @Test
     public void testNullNode() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
-        INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
+        INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
 
         boolean invalidParameter = false;
         // Verifies states after a failure during adding nodes.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
index 3bc549e..1d506ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java
@@ -59,10 +59,13 @@ public class ExceptionUtils {
         List<Exception> newExceptions = new ArrayList<>();
         for (Exception e : exceptions) {
             if (e instanceof HyracksDataException) {
-                newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId));
+                if (((HyracksDataException) e).getNodeId() == null) {
+                    newExceptions.add(HyracksDataException.create((HyracksDataException)
e, nodeId));
+                } else {
+                    newExceptions.add(e);
+                }
             } else {
-                newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE,
e.getMessage(),
-                        e, nodeId));
+                newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE,
e, nodeId));
             }
         }
         exceptions.clear();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index fc911c0..b1f39f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -179,8 +179,8 @@ public class NodeControllerService implements IControllerService {
         this.application = application;
         id = ncConfig.getNodeId();
 
-        ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()),
-                application.getFileDeviceResolver());
+        ioManager =
+                new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
         if (id == null) {
             throw new HyracksException("id not set");
         }
@@ -274,8 +274,7 @@ public class NodeControllerService implements IControllerService {
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(),
partitionManager,
                 ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
-                ncConfig.getDataPublicPort(),
-                FullFrameChannelInterfaceFactory.INSTANCE);
+                ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE);
         netManager.start();
 
         startApplication();
@@ -288,16 +287,17 @@ public class NodeControllerService implements IControllerService {
         this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
                 ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener()
{
-            @Override
-            public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
-                // we need to re-register in case the NC -> CC connection reset was due
to CC shutdown
-                try {
-                    registerNode();
-                } catch (Exception e) {
-                    throw new IPCException(e);
-                }
-            }
-        });
+                    @Override
+                    public void ipcHandleRestored(IIPCHandle handle) throws IPCException
{
+                        // we need to re-register in case of NC -> CC connection reset
+                        try {
+                            registerNode();
+                        } catch (Exception e) {
+                            LOGGER.log(Level.WARNING, "Failed Registering with cc", e);
+                            throw new IPCException(e);
+                        }
+                    }
+                });
         registerNode();
 
         workQueue.start();
@@ -332,15 +332,15 @@ public class NodeControllerService implements IControllerService {
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
-                : null;
+        NetworkAddress meesagingPort =
+                messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
: null;
         int allCores = osMXBean.getAvailableProcessors();
         nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
-                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema,
meesagingPort,
-                application.getCapacity(), PidHelper.getPid(), maxJobId.get());
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores,
runtimeMXBean.getVmName(),
+                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
+                runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+                runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(),
+                PidHelper.getPid(), maxJobId.get());
 
         ccs.registerNode(nodeRegistration);
 
@@ -572,12 +572,12 @@ public class NodeControllerService implements IControllerService {
 
     private static INCApplication getApplication(NCConfig config)
             throws ClassNotFoundException, IllegalAccessException, InstantiationException
{
-            if (config.getAppClass() != null) {
-                Class<?> c = Class.forName(config.getAppClass());
-                return (INCApplication) c.newInstance();
-            } else {
-                return BaseNCApplication.INSTANCE;
-            }
+        if (config.getAppClass() != null) {
+            Class<?> c = Class.forName(config.getAppClass());
+            return (INCApplication) c.newInstance();
+        } else {
+            return BaseNCApplication.INSTANCE;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 361ee37..962d541 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -44,7 +44,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
 
     private final Executor executor;
 
-    private final Map<JobId, IDatasetStateRecord> partitionResultStateMap;
+    private final Map<JobId, ResultSetMap> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -76,8 +76,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult,
partition, nPartitions,
                     datasetMemoryManager, fileFactory);
 
-            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.computeIfAbsent(jobId,
-                    k -> new ResultSetMap());
+            ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k ->
new ResultSetMap());
 
             ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
             resultStates[partition] = dpw.getResultState();
@@ -122,7 +121,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
 
     protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId,
int partition)
             throws HyracksException {
-        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap == null) {
             throw new HyracksException("Unknown JobId " + jobId);
         }
@@ -139,7 +138,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
 
     @Override
     public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition)
{
-        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap != null && rsIdMap.removePartition(jobId, resultSetId, partition))
{
             partitionResultStateMap.remove(jobId);
         }
@@ -147,13 +146,20 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
 
     @Override
     public synchronized void abortReader(JobId jobId) {
-        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
             rsIdMap.abortAll();
         }
     }
 
     @Override
+    public synchronized void abortAllReaders() {
+        for (ResultSetMap rsIdMap : partitionResultStateMap.values()) {
+            rsIdMap.abortAll();
+        }
+    }
+
+    @Override
     public synchronized void close() {
         for (JobId jobId : getJobIds()) {
             deinit(jobId);
@@ -167,7 +173,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
     }
 
     @Override
-    public IDatasetStateRecord getState(JobId jobId) {
+    public ResultSetMap getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
@@ -187,7 +193,7 @@ public class DatasetPartitionManager implements IDatasetPartitionManager
{
     }
 
     private synchronized void deinit(JobId jobId) {
-        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
             rsIdMap.closeAndDeleteAll();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index fd434d7..16e5027 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -39,9 +39,7 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
 
     private IFrameWriter delegate;
 
-    private boolean pendingConnection;
-
-    private boolean failed;
+    private volatile boolean pendingConnection = true;
 
     public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId
pid, TaskAttemptId taId) {
         this.ctx = ctx;
@@ -74,16 +72,13 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
     @Override
     public void open() throws HyracksDataException {
         manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
-        failed = false;
         pendingConnection = true;
         ensureConnected();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        if (!failed) {
-            delegate.nextFrame(buffer);
-        }
+        delegate.nextFrame(buffer);
     }
 
     private void ensureConnected() throws HyracksDataException {
@@ -93,7 +88,8 @@ public class PipelinedPartition implements IFrameWriter, IPartition {
                     try {
                         wait();
                     } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
+                        Thread.currentThread().interrupt();
+                        throw HyracksDataException.create(e);
                     }
                 }
             }
@@ -104,22 +100,21 @@ public class PipelinedPartition implements IFrameWriter, IPartition
{
 
     @Override
     public void fail() throws HyracksDataException {
-        failed = true;
-        if (delegate != null) {
+        if (!pendingConnection) {
             delegate.fail();
         }
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (!failed) {
+        if (!pendingConnection) {
             delegate.close();
         }
     }
 
     @Override
     public void flush() throws HyracksDataException {
-        if (!failed) {
+        if (!pendingConnection) {
             delegate.flush();
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
new file mode 100644
index 0000000..4fb4bf6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.Task;
+
+public class AbortAllTasksWork extends SynchronizableWork {
+
+    private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName());
+    private final NodeControllerService ncs;
+
+    public AbortAllTasksWork(NodeControllerService ncs) {
+        this.ncs = ncs;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Aborting all tasks");
+        }
+        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+        if (dpm != null) {
+            ncs.getDatasetPartitionManager().abortAllReaders();
+        }
+        for (Joblet ji : ncs.getJobletMap().values()) {
+            Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+            for (Task task : taskMap.values()) {
+                if (task != null) {
+                    task.abort();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 07e1ad2..5870e76 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -65,8 +65,8 @@ public class AbortTasksWork extends AbstractWork {
                 }
             }
         } else {
-            LOGGER.log(Level.WARNING, "Joblet couldn't be found. Tasks of job " + jobId
-                    + " have all either completed or failed");
+            LOGGER.log(Level.WARNING,
+                    "Joblet couldn't be found. Tasks of job " + jobId + " have all either
completed or failed");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java
index 66e7ae0..d09b890 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/FrameDebugUtils.java
@@ -39,11 +39,12 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      * @param prefix
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String
prefix) {
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
String prefix) {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
             int tc = fta.getTupleCount();
@@ -60,19 +61,21 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor) {
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor)
{
         prettyPrint(fta, recordDescriptor, "");
     }
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param operator
      */
-    public void prettyPrintTags(IFrameTupleAccessor fta, String operator) {
+    public static void prettyPrintTags(IFrameTupleAccessor fta, String operator) {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
             int tc = fta.getTupleCount();
@@ -90,14 +93,15 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param tid
      * @param bbis
      * @param dis
      * @param sb
      */
-    protected void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream
bbis, DataInputStream dis,
-            StringBuilder sb) {
+    protected static void prettyPrintTag(IFrameTupleAccessor fta, int tid, ByteBufferInputStream
bbis,
+            DataInputStream dis, StringBuilder sb) {
         sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid)
+ ")[");
         for (int j = 0; j < fta.getFieldCount(); ++j) {
             sb.append(" ");
@@ -115,6 +119,7 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      * @param tid
@@ -122,9 +127,8 @@ public class FrameDebugUtils {
      * @param dis
      * @param sb
      */
-    protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tid,
-            ByteBufferInputStream bbis, DataInputStream dis,
-            StringBuilder sb) {
+    protected static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tid,
+            ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb) {
         sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid)
+ ")[");
         for (int j = 0; j < fta.getFieldCount(); ++j) {
             sb.append(" ");
@@ -133,8 +137,8 @@ public class FrameDebugUtils {
             }
             sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid,
j) + ") ");
             sb.append("{");
-            bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength()
+ fta
-                    .getFieldStartOffset(tid, j));
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tid,
j));
             try {
                 sb.append(recordDescriptor.getFields()[j].deserialize(dis));
             } catch (Exception e) {
@@ -146,14 +150,14 @@ public class FrameDebugUtils {
         sb.append("\n");
     }
 
-
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      * @param tid
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int
tid) {
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tid) {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
             StringBuilder sb = new StringBuilder();
@@ -169,13 +173,14 @@ public class FrameDebugUtils {
      * They are safe as they don't print records. Printing records
      * using IserializerDeserializer can print incorrect results or throw exceptions.
      * A better way yet would be to use record pointable.
+     *
      * @param fta
      * @param recordDescriptor
      * @param prefix
      * @param recordFields
      * @throws IOException
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, String
prefix,
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
String prefix,
             int[] recordFields) throws IOException {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
@@ -191,14 +196,15 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      * @param tIdx
      * @param recordFields
      * @throws IOException
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, int
tIdx, int[] recordFields)
-            throws IOException {
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tIdx,
+            int[] recordFields) throws IOException {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
             StringBuilder sb = new StringBuilder();
@@ -209,14 +215,14 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param tuple
      * @param fieldsIdx
      * @param descIdx
      * @throws HyracksDataException
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference
tuple,
-            int fieldsIdx, int descIdx)
-            throws HyracksDataException {
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
ITupleReference tuple,
+            int fieldsIdx, int descIdx) throws HyracksDataException {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
             StringBuilder sb = new StringBuilder();
@@ -237,11 +243,12 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param tuple
      * @param descF
      * @throws HyracksDataException
      */
-    public void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor, ITupleReference
tuple,
+    public static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
ITupleReference tuple,
             int[] descF) throws HyracksDataException {
         try (ByteBufferInputStream bbis = new ByteBufferInputStream();
                 DataInputStream dis = new DataInputStream(bbis)) {
@@ -265,6 +272,7 @@ public class FrameDebugUtils {
 
     /**
      * Debugging method
+     *
      * @param fta
      * @param recordDescriptor
      * @param tid
@@ -274,17 +282,15 @@ public class FrameDebugUtils {
      * @param recordFields
      * @throws IOException
      */
-    protected void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tid,
-            ByteBufferInputStream bbis, DataInputStream dis,
-            StringBuilder sb,
-            int[] recordFields) throws IOException {
+    protected static void prettyPrint(IFrameTupleAccessor fta, RecordDescriptor recordDescriptor,
int tid,
+            ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb, int[] recordFields)
throws IOException {
         Arrays.sort(recordFields);
         sb.append(" tid" + tid + ":(" + fta.getTupleStartOffset(tid) + ", " + fta.getTupleEndOffset(tid)
+ ")[");
         for (int j = 0; j < fta.getFieldCount(); ++j) {
             sb.append("f" + j + ":(" + fta.getFieldStartOffset(tid, j) + ", " + fta.getFieldEndOffset(tid,
j) + ") ");
             sb.append("{");
-            bbis.setByteBuffer(fta.getBuffer(), fta.getTupleStartOffset(tid) + fta
-                    .getFieldSlotsLength() + fta.getFieldStartOffset(tid, j));
+            bbis.setByteBuffer(fta.getBuffer(),
+                    fta.getTupleStartOffset(tid) + fta.getFieldSlotsLength() + fta.getFieldStartOffset(tid,
j));
             if (Arrays.binarySearch(recordFields, j) >= 0) {
                 sb.append("{a record field: only print using pointable:");
                 sb.append("tag->" + dis.readByte() + "}");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index 0cc0170..d7d5c27 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -123,14 +123,15 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor,
IPar
             try {
                 wait();
             } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
         }
     }
 
     public synchronized void close() throws HyracksDataException {
-        for (int i = closedSenders.nextClearBit(0); i >= 0
-                && i < nSenderPartitions; i = closedSenders.nextClearBit(i + 1))
{
+        for (int i = closedSenders.nextClearBit(0); i >= 0 && i < nSenderPartitions;
i =
+                closedSenders.nextClearBit(i + 1)) {
             if (channels[i] != null) {
                 channels[i].close();
                 channels[i] = null;


Mime
View raw message